"""Namelist."""
import collections
import logging
import re
import f90nml
from .binary_input import InputDataFromNamelist
[docs]class NamelistGenerator(object):
"""Namelist class."""
[docs] def __init__(self, program, config, definitions, assemble=None, consistency=True):
"""Construct a base namelists class.
Args:
program (str): Which surfex binary you want to run ["pgd", "prep", "offline", "soda"]
config (surfex.Configuration): A SURFEX configuration object
definitions (dict): Namelist definitions
assemble(dict, optional): Assembly order. Default to None.
consistency (bool, optional): Check configuration consistency. Defaults to True.
"""
self.program = program
self.config = config
self.nldict = definitions
nobstype = 0
if program == "soda" or program == "offline" or program == "perturbed":
nnco = self.config.get_setting("SURFEX#ASSIM#OBS#NNCO")
nobstype = 0
for __, obs_val in enumerate(nnco):
if obs_val == 1:
nobstype += 1
self.config.update_setting("SURFEX#SODA#NOBSTYPE", nobstype)
soil_assim = self.config.get_setting("SURFEX#ASSIM#SCHEMES#ISBA")
nncv = None
if soil_assim == "EKF":
nncv = self.config.get_setting("SURFEX#ASSIM#ISBA#EKF#NNCV")
if soil_assim == "ENKF":
nncv = self.config.get_setting("SURFEX#ASSIM#ISBA#ENKF#NNCV")
nvar = 0
if nncv is not None:
for __, cval in enumerate(nncv):
if cval == 1:
nvar += 1
self.config.update_setting("SURFEX#SODA#NVAR", nvar)
if program == "soda":
laesnm = False
hh = self.config.get_setting("SURFEX#SODA#HH")
if hh in self.config.get_setting("SURFEX#ASSIM#ISBA#UPDATE_SNOW_CYCLES"):
laesnm = True
self.config.update_setting("SURFEX#SODA#LAESNM", laesnm)
macros = self.flatten_config()
self.macros = macros
if assemble is None:
self.assemble = self.namelist_blocks()
else:
self.assemble = assemble
logging.debug(self.assemble)
nlres = self.assemble_namelist()
self.nml = f90nml.Namelist(nlres)
if consistency:
problems = self.concistency(self.nml)
if problems:
logging.warning("Found problems!")
for problem, info in problems.items():
for level, desc in info.items():
if level == "SEVERE":
logging.error(
"Problem with: %s Description=%s", problem, desc
)
elif level == "WARNING":
logging.warning(
"Problem with: %s Description=%s", problem, desc
)
else:
logging.info("Problem with: %s Description=%s", problem, desc)
[docs] def flatten_config(self):
"""Flatten dictionary.
Returns:
source(dict): Flat dict with settings
"""
def _flatten_dict_gen(dic, parent_key, sep):
for key, val in dic.items():
new_key = parent_key + sep + key if parent_key else key
if isinstance(val, collections.abc.MutableMapping):
yield from flatten_dict(val, new_key, sep=sep).items()
else:
if isinstance(val, tuple):
val = list(val)
yield new_key, val
def flatten_dict(
d: collections.abc.MutableMapping, parent_key: str = "", sep: str = "#"
):
return dict(_flatten_dict_gen(d, parent_key, sep))
return flatten_dict(self.config.settings)
[docs] def namelist_blocks(self):
"""Construct building blocks for the namelist genrator."""
logging.info("Building namelist blocks for program: %s", self.program)
input_blocks = ["io", "constants", "treedrag", "flake"]
lisba = False
if self.config.get_setting("SURFEX#ISBA#SCHEME") == "DIF":
lisba = True
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "3-L":
lisba = True
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "2-L":
lisba = True
# Program specific settings
if self.program == "pgd":
input_blocks += ["pgd", "pgd_cover", "pgd_zs"]
eco_sg = self.config.get_setting("SURFEX#COVER#SG")
if eco_sg:
input_blocks += ["pgd_ecoclimap_sg"]
else:
input_blocks += ["pgd_ecoclimap"]
# Set ISBA properties
if lisba:
input_blocks += ["pgd_isba"]
if self.config.get_setting("SURFEX#ISBA#SCHEME") == "DIF":
input_blocks += ["pgd_isba_dif"]
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "3-L":
input_blocks += ["pgd_isba_3l"]
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "2-L":
input_blocks += ["pgd_isba_2l"]
# Set MEB
if self.config.get_setting("SURFEX#ISBA#MEB"):
input_blocks += ["pgd_meb"]
# RSMIN
if self.config.get_setting("SURFEX#COVER#SG"):
input_blocks += ["pgd_rsmin_sg"]
else:
input_blocks += ["pgd_rsmin"]
# CV
if self.config.get_setting("SURFEX#COVER#SG"):
input_blocks += ["pgd_cv_sg"]
else:
input_blocks += ["pgd_cv"]
# Treedrag
input_blocks += ["pgd_treedrag"]
if self.config.get_setting("SURFEX#TOWN#LTOWN_TO_ROCK"):
input_blocks += ["pgd_town_to_rock"]
if self.config.get_setting("SURFEX#TILES#INLAND_WATER") == "FLAKE":
input_blocks += ["pgd_flake"]
# Sea
input_blocks += ["pgd_sea"]
elif self.program == "prep":
input_blocks += ["prep"]
# SEAFLX settings
if self.config.get_setting("SURFEX#TILES#SEA") == "SEAFLX":
input_blocks += ["prep_seaflux", "prep_seaflx"]
if self.config.get_setting("SURFEX#SEA#ICE") == "SICE":
input_blocks += ["prep_sice"]
if self.config.get_setting("SURFEX#TILES#INLAND_WATER") == "FLAKE":
input_blocks += ["prep_flake"]
# ISBA
if self.config.get_setting("SURFEX#ISBA#SCHEME") == "DIF":
input_blocks += ["prep_isba", "prep_isba_dif"]
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "3-L":
input_blocks += ["prep_isba", "prep_isba_3l"]
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "2-L":
input_blocks += ["prep_isba", "prep_isba_2l"]
if lisba:
# ISBA CANOPY
if self.config.get_setting("SURFEX#ISBA#CANOPY"):
input_blocks += ["prep_isba_canopy"]
# Snow
input_blocks += ["prep_isba_snow"]
if self.config.get_setting("SURFEX#ISBA#SNOW") == "D95":
input_blocks += ["prep_isba_snow_d95"]
elif self.config.get_setting("SURFEX#ISBA#SNOW") == "3-L":
input_blocks += ["prep_isba_snow_3l"]
if self.config.get_setting("SURFEX#ISBA#SNOW") == "CRO":
input_blocks += ["prep_isba_snow_cro"]
if self.config.get_setting("SURFEX#PREP#FILE") is None:
input_blocks += ["prep_from_namelist"]
else:
input_blocks += ["prep_from_file"]
if self.config.get_setting("SURFEX#PREP#FILEPGD") is not None:
input_blocks += ["prep_from_file_with_pgd"]
elif self.program == "offline" or self.program == "perturbed":
input_blocks += ["offline"]
if self.config.get_setting("SURFEX#IO#LSELECT"):
input_blocks += ["offline_selected_output"]
# SEAFLX settings
if self.config.get_setting("SURFEX#TILES#SEA") == "SEAFLX":
input_blocks += ["offline_seaflux", "offline_seaflx"]
if self.config.get_setting("SURFEX#SEA#ICE") == "SICE":
input_blocks += ["offline_sice"]
# ISBA settings
if lisba:
input_blocks += ["offline_isba"]
if self.config.get_setting("SURFEX#ISBA#SCHEME") == "DIF":
input_blocks += ["offline_isba_dif"]
if self.config.get_setting("SURFEX#ISBA#PERTSURF"):
input_blocks += ["offline_isba_pertsurf"]
# SSO
input_blocks += ["offline_sso"]
sso = self.config.get_setting("SURFEX#SSO#SCHEME").lower()
input_blocks += ["offline_sso_" + sso]
# Perturbed offline settings
if self.program == "perturbed":
if lisba:
input_blocks += ["offline_pert_isba_settings"]
if self.config.get_setting("SURFEX#ASSIM#SCHEMES#ISBA") == "EKF":
input_blocks += ["offline_pert_isba_ekf"]
if self.config.get_setting("SURFEX#ASSIM#SCHEMES#ISBA") == "ENKF":
input_blocks += ["offline_pert_isba_enkf"]
input_blocks += ["offline_pert_obs"]
# Climate setting
if self.config.get_setting("SURFEX#SEA#LVOLATILE_SIC"):
input_blocks += ["offline_volatile_sic"]
elif self.program == "soda":
input_blocks += ["soda", "soda_obs"]
if self.config.get_setting("SURFEX#SEA#ICE") == "SICE":
input_blocks += ["soda_sice"]
# Set OI settings
if self.config.get_setting("SURFEX#ASSIM#SCHEMES#ISBA") == "OI":
input_blocks += ["soda_isba_oi"]
elif self.config.get_setting("SURFEX#ASSIM#SCHEMES#ISBA") == "EKF":
input_blocks += ["soda_isba_ekf"]
elif self.config.get_setting("SURFEX#ASSIM#SCHEMES#ISBA") == "ENKF":
input_blocks += ["soda_isba_enkf"]
# Town
if self.config.get_setting("SURFEX#ASSIM#SCHEMES#TEB").lower() != "none":
input_blocks += ["soda_teb"]
else:
raise NotImplementedError(self.program)
logging.info("Generated input_blocks: %s", input_blocks)
return input_blocks
[docs] def get_namelist(self):
"""Get namelist."""
return self.nml
[docs] @staticmethod
def check_nml_setting(problems, nml, block, key, value):
"""Check namelist settings.
Args:
problems (dict): Problems
nml (f90nml.Namelist): Namelist
block (str): Block
key (str): Key
value (any): Value
Returns:
problems (dict): Problems
"""
ckey = block + "#" + key
if block in nml:
if key in nml[block]:
if nml[block][key] != value:
msg = f"Mismatch: {nml[block][key]} != {value}"
problems.update({ckey: {"SEVERE": msg}})
else:
problems.update({ckey: {"WARNING": "Namelist key not found"}})
else:
problems.update({ckey: {"WARNING": "Namelist block not found"}})
return problems
[docs] def concistency(self, nml):
"""Check if namelist is consistent with config.
Args:
nml (f90nml.Namelist): A parsed f90nml namelist
Raises:
NotImplementedError: Mode is not implemented
Returns:
problems (dict): Problems.
"""
logging.info("Check namelist input for program: %s", self.program)
problems = {}
problems = self.check_nml_setting(
problems,
nml,
"NAM_IO_OFFLINE",
"CSURF_FILETYPE",
self.config.get_setting("SURFEX#IO#CSURF_FILETYPE"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_IO_OFFLINE",
"CTIMESERIES_FILETYPE",
self.config.get_setting("SURFEX#IO#CTIMESERIES_FILETYPE"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_IO_OFFLINE",
"CFORCING_FILETYPE",
self.config.get_setting("SURFEX#IO#CFORCING_FILETYPE"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_IO_OFFLINE",
"XTSTEP_SURF",
self.config.get_setting("SURFEX#IO#XTSTEP"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_IO_OFFLINE",
"XTSTEP_OUTPUT",
self.config.get_setting("SURFEX#IO#XTSTEP_OUTPUT"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_WRITE_SURF_ATM",
"LSPLIT_PATCH",
self.config.get_setting("SURFEX#IO#LSPLIT_PATCH"),
)
# Constants and parameters
problems = self.check_nml_setting(
problems,
nml,
"NAM_SURF_ATM",
"XRIMAX",
self.config.get_setting("SURFEX#PARAMETERS#XRIMAX"),
)
# Program specific settings
if self.program == "pgd":
problems = self.check_nml_setting(
problems,
nml,
"NAM_PGD_SCHEMES",
"CSEA",
self.config.get_setting("SURFEX#TILES#SEA"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_PGD_SCHEMES",
"CWATER",
self.config.get_setting("SURFEX#TILES#INLAND_WATER"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_PGD_SCHEMES",
"CNATURE",
self.config.get_setting("SURFEX#TILES#NATURE"),
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_PGD_SCHEMES",
"CTOWN",
self.config.get_setting("SURFEX#TILES#TOWN"),
)
if self.config.get_setting("SURFEX#TOWN#LTOWN_TO_ROCK"):
if self.config.get_setting("SURFEX#TILES#TOWN") != "NONE":
logging.warning(
"WARNING: TOWN is not NONE and you want LTOWN_TO_ROCK. "
"Setting it to NONE!"
)
problems = self.check_nml_setting(
problems,
nml,
"NAM_FRAC",
"LECOSG",
self.config.get_setting("SURFEX#COVER#SG"),
)
fname = str(self.config.get_setting("SURFEX#COVER#YCOVER"))
__, filetype = self.get_filetype_from_suffix(fname)
if filetype is not None:
problems = self.check_nml_setting(
problems, nml, "NAM_COVER", "YCOVERFILETYPE", filetype
)
# ZS
fname = str(self.config.get_setting("SURFEX#ZS#YZS"))
__, filetype = self.get_filetype_from_suffix(fname)
if filetype is not None:
problems = self.check_nml_setting(
problems, nml, "NAM_ZS", "YZSFILETYPE", filetype
)
# Set ISBA properties
if self.config.get_setting("SURFEX#ISBA#SCHEME") == "DIF":
problems = self.check_nml_setting(
problems, nml, "NAM_ISBA", "CISBA", "DIF"
)
problems = self.check_nml_setting(
problems, nml, "NAM_ISBA", "NGROUND_LAYER", 14
)
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "3-L":
problems = self.check_nml_setting(
problems, nml, "NAM_ISBA", "CISBA", "3-L"
)
problems = self.check_nml_setting(
problems, nml, "NAM_ISBA", "NGROUND_LAYER", 3
)
elif self.config.get_setting("SURFEX#ISBA#SCHEME") == "2-L":
problems = self.check_nml_setting(
problems, nml, "NAM_ISBA", "CISBA", "2-L"
)
problems = self.check_nml_setting(
problems, nml, "NAM_ISBA", "NGROUND_LAYER", 2
)
# Set patches
problems = self.check_nml_setting(
problems,
nml,
"NAM_ISBA",
"NPATCH",
self.config.get_setting("SURFEX#ISBA#NPATCH"),
)
# Set MEB
problems = self.check_nml_setting(
problems,
nml,
"NAM_ISBA",
"LMEB",
self.config.get_setting("SURFEX#ISBA#MEB"),
)
elif self.program == "prep":
pass
elif self.program == "offline" or self.program == "perturbed":
pass
elif self.program == "soda":
pass
else:
raise NotImplementedError(self.program)
return problems
[docs] @staticmethod
def get_filetype_from_suffix(fname):
"""Get the file type from suffix.
Args:
fname (str): File name
Returns:
tuple: fname, format
"""
if fname.endswith(".dir"):
fname = fname.replace(".dir", "")
return fname, "DIRECT"
if fname.endswith(".ascllv"):
fname = fname.replace(".ascllv", "")
return fname, "ASCLLV"
return fname, None
def input_data_from_namelist(
self, input_data, platform, basetime=None, validtime=None
):
"""Construct a base namelists class to be implemented by namelist implementations.
Args:
input_data (dict): Input data definitions
platform (SystemFilePaths): Platform specific settings
basetime (as_datetime, optional): Base time
validtime (as_datetime, optional): Valid time
Returns:
data_obj (InputDataFromNamelist): Input data from namelist
"""
logging.info("Set input data from namelist for program: %s", self.program)
data_obj = InputDataFromNamelist(
self.nml,
input_data,
self.program,
platform,
basetime=basetime,
validtime=validtime,
)
return data_obj
[docs] def assemble_namelist(self):
"""Generate the namelists for 'target'.
Raises:
KeyError: Key not found
Returns:
nlres (dict): Assembled namelist
"""
# Read namelist file with all the categories
# Check target is valid
cndict = self.assemble
nldict = self.nldict
# Start with empty result dictionary
nlres = {}
# Assemble the target namelists based on the given category order
for item in self.flatten_list(cndict):
catg = item
# variable substitution removed at this level (may be resurrected)
# assemble namelists for this category
if catg in nldict:
for nl in nldict[catg]:
if nl not in nlres:
# create the result namelist dict
nlres[nl] = {}
if catg == "rm{" + nl + "}":
# clear/remove the given namelist (but not used for now)
nlres[nl].clear()
else:
for key in nldict[catg][nl]:
val = nldict[catg][nl][key]
finval = val
# Replace ${var-def} with value from config, possibly macro-expanded
# For now assumes only one subst. per line, could be generalized if needed
if str(finval).find("$") >= 0:
m = re.search(
r"^([^\$]*)\$\{([\w\#]+)\-?([^}]*)\}(.*)", str(val)
)
if m:
pre = m.group(1)
nam = m.group(2)
defval = m.group(3)
post = m.group(4)
logging.debug("macros=%s", self.macros)
logging.debug("look for nam=%s", nam)
try:
repval = self.macros[nam]
except KeyError:
repval = None
if repval is None:
if defval != "":
logging.debug(
"Using default value %s for '%s'",
defval,
nam,
)
repval = self.find_num(defval)
else:
logging.debug("No value found for: '%s'", nam)
else:
logging.debug(
"Replaced %s with: %s", nam, str(repval)
)
if isinstance(repval, str):
finval = str(pre) + str(repval) + str(post)
else:
finval = repval
else:
raise KeyError(val)
nlres[nl][key] = finval
else:
logging.info("Category %s not found in definitions", catg)
return nlres
[docs] def write(self, output_file):
"""Generate the namelists for 'target'.
Args:
output_file (str): where to write the result (OPTIONS.nam, fort.4 or EXSEG1.nam typically)
"""
self.nml.uppercase = True
self.nml.true_repr = ".TRUE."
self.nml.false_repr = ".FALSE."
self.nml.write(output_file, force=True)
logging.debug("Wrote: %s", output_file)
[docs] @staticmethod
def flatten_list(li):
"""Recursively flatten a list of lists (of lists)."""
if li == []:
return li
if isinstance(li[0], list):
return NamelistGenerator.flatten_list(li[0]) + NamelistGenerator.flatten_list(
li[1:]
)
return li[:1] + NamelistGenerator.flatten_list(li[1:])
[docs] @staticmethod
def find_num(s):
"""Purpose: un-quote numbers."""
try:
i = int(s)
return i
except ValueError:
pass
try:
f = float(s)
return f
except ValueError:
return s