Source code for pysurfex.binary_input
"""Input data for surfex binaries."""
import json
import logging
import os
import subprocess
from abc import ABC, abstractmethod
[docs]class JsonOutputData(OutputDataFromSurfexBinaries):
"""Output data."""
[docs] def __init__(self, data):
"""Output data from dict.
Args:
data (dict): Output data.
"""
self.data = data
[docs] def archive_files(self):
"""Archive files."""
for output_file, target in self.data.items():
logging.info("%s -> %s", output_file, target)
command = "mv"
if isinstance(target, dict):
for key in target:
logging.debug("%s %s %s", output_file, key, target[key])
command = target[key]
target = key
cmd = command + " " + output_file + " " + target
try:
logging.info(cmd)
subprocess.check_call(cmd, shell=True) # noqaS602
except IOError:
logging.error("%s failed", cmd)
raise RuntimeError(cmd + " failed") from IOError
[docs]class JsonOutputDataFromFile(JsonOutputData):
"""JSON output data."""
[docs] def __init__(self, file):
"""Construct from json file."""
with open(file, mode="r", encoding="utf-8") as file_handler:
data = json.load(file_handler)
JsonOutputData.__init__(self, data)
[docs]class JsonInputData(InputDataToSurfexBinaries):
"""JSON input data."""
[docs] def __init__(self, data):
"""Construct input data.
Args:
data (dict): Input data.
"""
self.data = data
[docs] def prepare_input(self):
"""Prepare input."""
for target, input_file in self.data.items():
logging.info("%s -> %s", target, input_file)
logging.debug(os.path.realpath(target))
command = None
if isinstance(input_file, dict):
for key in input_file:
logging.debug(key)
logging.debug(input_file[key])
command = str(input_file[key])
input_file = str(key)
command = command.replace("@INPUT@", input_file)
command = command.replace("@TARGET@", target)
if os.path.realpath(target) == os.path.realpath(input_file):
logging.info("Target and input file is the same file")
else:
if command is None:
cmd = "ln -sf " + input_file + " " + target
else:
cmd = command
try:
logging.info(cmd)
subprocess.check_call(cmd, shell=True) # noqaS602
except IOError:
raise (cmd + " failed") from IOError
[docs] def add_data(self, data):
"""Add data.
Args:
data (dict): Data to add
"""
for key in data:
value = data[key]
self.data.update({key: value})
[docs]class JsonInputDataFromFile(JsonInputData):
"""JSON input data."""
[docs] def __init__(self, file):
"""Construct JSON input data.
Args:
file (str): JSON file name
"""
with open(file, mode="r", encoding="utf-8") as file_handler:
data = json.load(file_handler)
JsonInputData.__init__(self, data)
[docs]class InputDataFromNamelist(JsonInputData):
"""Binary input data for offline executables."""
[docs] def __init__(self, nml, input_data, program, platform, basetime=None, validtime=None):
"""Construct InputDataFromNamelist.
Args:
nml (f90nml.Namelist): Namelist
input_data (dict): Input data mapping
program (str): Kind of program
platform (SystemFilePaths): Platform settings
basetime (as_datetime, optional): Baseetime. Defaults to None.
validtime (as_datetime, optional): Validtime. Defaults to None.
Raises:
RuntimeError: Program not defined
"""
self.nml = nml
self.platform = platform
self.basetime = basetime
self.validtime = validtime
try:
self.data = input_data[program]
except KeyError:
raise RuntimeError(f"Could not find program {program}") from KeyError
data = self.process_data()
JsonInputData.__init__(self, data)
[docs] @staticmethod
def get_nml_value2(nml, block, key, indices=None):
"""Get namelist value.
Args:
nml (nmlf90.Namelist): Namelist
block (str): Namelist block
key (str): Namelist key
indices (list, optional): Indices to read. Defaults to None.
Returns:
setting (any): Namelist setting
"""
logging.debug("Checking block=%s key=%s", block, key)
if block in nml:
if key in nml[block]:
logging.debug(nml[block][key])
if indices is not None:
logging.debug("indices=%s", indices)
try:
if len(indices) == 2:
val = nml[block][key][indices[1]][indices[0]]
else:
val = nml[block][key][indices[0]]
logging.debug("Found 1D value %s", val)
if isinstance(val, list):
return None
except IndexError:
return None
except TypeError:
return None
else:
val = nml[block][key]
logging.debug("Found: %s Indices=%s", val, indices)
return val
return None
[docs] @staticmethod
def get_nml_value(nml, block, key, indices=None):
"""Get namelist value.
Args:
nml (nmlf90.Namelist): Namelist
block (str): Namelist block
key (str): Namelist key
indices (list, optional): Indices to read. Defaults to None.
Returns:
setting (any): Namelist setting
"""
logging.debug("Checking block=%s key=%s", block, key)
if block in nml:
if key in nml[block]:
vals = []
val_dict = {}
val = nml[block][key]
logging.debug("namelist type=%s", type(val))
if indices is not None:
logging.debug("indices=%s", indices)
if len(indices) == 2:
val = nml[block][key][indices[1]][indices[0]]
else:
val = nml[block][key][indices[0]]
logging.debug("Found 1D value %s", val)
if isinstance(val, list):
return None
val_dict.update({"value": val, "indices": None})
vals.append(val_dict)
else:
if isinstance(val, list):
dim_size = len(val)
logging.debug("dim_size=%s", dim_size)
dims = []
tval = val
more_dimensions = True
while more_dimensions:
logging.debug("tval=%s type(tval)=%s", tval, type(tval))
if isinstance(tval, int):
more_dimensions = False
else:
logging.debug("type(tval)=%s", type(tval))
if not isinstance(tval, list):
more_dimensions = False
else:
if isinstance(tval[0], int):
more_dimensions = False
else:
logging.debug(
"len(tval)=%s type(tval)=%s", len(tval), type
)
dim_size = len(tval)
dims.append(dim_size)
tval = tval[0]
logging.debug(
"New tval=%s dim_size=%s", tval, dim_size
)
logging.debug("dims=%s", dims)
logging.debug("type(val)=%s", type(val))
if len(dims) == 2:
for i in range(0, dims[0]):
for j in range(0, dims[1]):
val_dict = {}
indices = [j, i]
lval = val[i][j]
val_dict.update({"value": lval, "indices": indices})
logging.debug("value=%s indices=%s", lval, indices)
vals.append(val_dict)
elif len(dims) == 1:
for i in range(0, dims[0]):
val_dict = {}
indices = [i]
logging.debug("i=%s, val[i]=%s", i, val[i])
lval = val[i]
val_dict.update({"value": lval, "indices": indices})
logging.debug("value=%s indices=%s", lval, indices)
vals.append(val_dict)
elif len(dims) == 0:
val_dict = {}
logging.debug("val=%s", val)
val_dict.update({"value": val, "indices": None})
vals.append(val_dict)
else:
val_dict = {}
if isinstance(val, bool):
val = str(val)
val_dict.update({"value": val, "indices": None})
vals.append(val_dict)
logging.debug("Found: value=%s", val_dict["value"])
return vals
return None
[docs] @staticmethod
def get_nml_value_from_string(nml, string, sep="#", indices=None):
"""Get namelist value from a string.
Args:
nml (nmlf90.Namelist): Namelist
string (str): Namelist identifier
sep (str, optional): _description_. Defaults to "#".
indices (list, optional): Indices to read. Defaults to None.
Returns:
setting (any): Namelist setting
"""
nam_section = string.split(sep)[0]
nam_key = string.split(sep)[1]
return InputDataFromNamelist.get_nml_value(
nml, nam_section, nam_key, indices=indices
)
[docs] def substitute(self, key, val, macros=None, micro="@", check_parsing=False):
"""Substitute patterns.
Args:
key (str): _description_
val (str): _description_
macros (dict, optional): Macros. Defaults to None.
micro (str, optional): Micro character. Defaults to "@".
check_parsing (bool, optional): Caheck if values were substituted.
Returns:
dict: Substituted key=value
"""
logging.debug(
"Substitute key=%s and val=%s %s %s", key, val, self.basetime, self.validtime
)
pkey = key
pval = val
for spath_key, spath_val in self.platform.system_file_paths.items():
pkey = pkey.replace(f"{micro}{spath_key}{micro}", spath_val)
pval = pval.replace(f"{micro}{spath_key}{micro}", spath_val)
if macros is not None:
for macro_key, macro_val in macros.items():
pkey = pkey.replace(f"{micro}{macro_key}{micro}", macro_val)
pval = pval.replace(f"{micro}{macro_key}{micro}", macro_val)
pkey = self.platform.parse_setting(
pkey,
validtime=self.validtime,
basedtg=self.basetime,
check_parsing=check_parsing,
)
pval = self.platform.parse_setting(
pval,
validtime=self.validtime,
basedtg=self.basetime,
check_parsing=check_parsing,
)
return pkey, pval
[docs] def read_macro_setting(self, macro_defs, key, default=None, sep="#"):
"""Read a macro setting.
Args:
macro_defs (dict): Macro definition
key (str): Macro setting to get.
default (str, optional): Default value. Defaults to None.
sep (str, optional): Namelist key separator. Defaults to "#".
Returns:
setting (any)
"""
try:
setting = macro_defs[key]
if isinstance(setting, str):
if setting.find(sep) > 0:
logging.debug("Read macro setting from namelist %s", setting)
setting = self.get_nml_value_from_string(self.nml, setting)
if isinstance(setting, list):
setting = setting[0]["value"]
return setting
except KeyError:
return default
[docs] def extend_macro(self, key, val, macros, sep="#"):
"""Extend entries from macro.
Args:
key (_type_): _description_
val (_type_): _description_
macros (dict): Macros
sep (str, optional): Namelist key separator. Defaults to "#".
Raises:
NotImplementedError: _description_
NotImplementedError: _description_
Returns:
dict: Key, value dictionary
"""
logging.debug("extenders=%s", macros)
if macros is None:
return {key: val}
processed_data = {}
for macro, macro_types in macros.items():
loop = {}
for macro_type, macro_defs in macro_types.items():
logging.debug("macro_defs=%s", macro_defs)
if macro_type == "ekfpert":
nncvs = self.read_macro_setting(macro_defs, "list", sep=sep)
logging.debug("nncvs=%s", nncvs)
nncvs = nncvs.copy()
duplicate = self.read_macro_setting(macro_defs, "duplicate", sep=sep)
if duplicate:
nncvs += nncvs
loop.update({"0": "0"})
icounter1 = 1
icounter2 = 1
for nncv in nncvs:
if nncv == 1:
loop.update({str(icounter1): str(icounter2)})
icounter1 += 1
icounter2 += 1
elif macro_type == "dict":
values = self.get_nml_value_from_string(self.nml, macro_defs)
counter = 0
for key, val in values[0].items():
loop.update({str(key): str(val)})
counter += 1
elif macro_type == "iterator":
start = self.read_macro_setting(macro_defs, "start", sep=sep)
end = self.read_macro_setting(macro_defs, "end", sep=sep)
fmt = self.read_macro_setting(
macro_defs, "fmt", sep=sep, default=None
)
if fmt is None:
fmt = "{:d}"
for lval in range(start, end):
lval = fmt.format(lval)
loop.update({str(lval): str(lval)})
else:
raise NotImplementedError
# Loop normal macros not being nml arrays
unprocessed_data = processed_data.copy()
if processed_data:
unprocessed_data = processed_data
else:
unprocessed_data = {key: val}
for key, val in unprocessed_data.items():
for vmacro1, vmacro2 in loop.items():
logging.debug(
"key=%s val=%s macro=%s vmacro1=%s vmacro2=%s",
key,
val,
macro,
vmacro1,
vmacro2,
)
if key.find("#") > 0:
key = self.get_nml_value_from_string(self.nml, key, sep=sep)
pkey = key.replace(f"@{macro}@", vmacro1)
pval = val.replace(f"@{macro}@", vmacro2)
processed_data.update({pkey: pval})
logging.debug("Processed data=%s", processed_data)
return processed_data
[docs] def process_macro(self, key, val, macros, sep="#", indices=None):
"""Process macro.
Args:
key (str): Key
val (str): Value
macros (dict): Macros
sep (str, optional): Namelist key separator. Defaults to "#".
indices (list, optional): Process macro from namelist indices.
Raises:
NotImplementedError: Only 2 dimensions are implemented
Returns:
dict: Key, value dictionary
"""
logging.debug("macros=%s", macros)
if macros is None:
return key, val
logging.debug("indices=%s", indices)
if indices is None:
return key, val
pkey = key
pval = val
for macro in macros:
lindex = None
if len(indices) == 2:
if macro == "DECADE":
lindex = indices[1]
else:
lindex = indices[0]
elif len(indices) == 1:
lindex = indices[0]
elif len(indices) > 2:
raise NotImplementedError("Only 2 dimensions are implemented")
vmacro = None
if lindex is not None:
try:
macro_defs = macros[macro]
logging.debug("macro_defs=%s", macro_defs)
except KeyError:
logging.warning(
"Macro %s not defined. Use index value %s", macro, lindex
)
vmacro = str(lindex + 1)
if macro == "VTYPE":
vmacro = str(lindex + 1)
elif "DECADE" in macros:
ntime = self.read_macro_setting(macro_defs, "ntime", sep=sep)
dec_days = int(360 / float(ntime))
dec_start = int(dec_days / 2)
dec_end = 360 + dec_start
dec = 0
for day in range(dec_start, dec_end, dec_days):
logging.debug("day=%s, dec=%s lindex=%s", day, dec, lindex)
month = int(day / 30) + 1
mday = int(day % 30)
if dec == lindex:
vmacro = f"{month:02d}{mday:02d}"
dec += 1
logging.debug(
"Substitute @%s@ with %s pkey=%s pval=%s", macro, vmacro, pkey, pval
)
if isinstance(pkey, str):
pkey = pkey.replace(f"@{macro}@", vmacro)
if isinstance(pval, str):
pval = pval.replace(f"@{macro}@", vmacro)
logging.debug(
"Substitute @%s@ with %s pkey=%s pval=%s", macro, vmacro, pkey, pval
)
return pkey, pval
[docs] def matching_value(self, data, val, sep="#", indices=None):
"""Match the value. Possibly also read namelist value.
Args:
data (dict): Data to check keys for
val (str): Key to find
sep (str, optional): Namelist separator. Defaults to "#".
indices(list, optional): Indices in namelist
Raises:
RuntimeError: "Malformed input data"
Returns:
dict: Matching entry in data.
"""
if val == "macro" or val == "extenders":
return None
logging.debug("type(data)=%s", type(data))
logging.debug("type(val)=%s", type(val))
logging.debug("indices=%s", indices)
if isinstance(data, dict):
mdata = data.keys()
else:
mdata = [data]
val = str(val)
logging.debug("Check if val=%s matches mdata=%s", val, mdata)
sval = None
for mval in mdata:
if val.find(sep) > 0:
logging.debug("val=%s is a namelist variable", val)
sval = self.get_nml_value_from_string(self.nml, val, indices=indices)
logging.debug("Got sval=%s", sval)
if sval is None:
return None
indices = sval[0]["indices"]
sval = sval[0]["value"]
if mval == val:
logging.debug("Found matching data. val=%s data=%s", val, data)
try:
rval = data[val]
except TypeError:
raise RuntimeError("Malformed input data") from TypeError
if sval is not None:
rval = {sval: rval}
logging.debug("Return data rval=%s", rval)
return rval
logging.warning("Value=%s not found in data", val)
return None
[docs] def process_data(self, sep="#"):
"""Process input definitions on files to map.
Args:
sep (str, optional): Namelist separator. Defaults to "#".
Returns:
mapped_data (dict): A dict with mapped local names and target files.
"""
logging.debug("Process data: %s", self.data)
def _process_data(mapped_data, data, indices=None, macros=None, extenders=None):
for key, value in data.items():
logging.debug(".................. key=%s", key)
# Required namelist variable
if key.find(sep) > 0:
vals = self.get_nml_value_from_string(self.nml, key, indices=indices)
else:
vals = [{"value": value, "indices": None}]
if isinstance(vals, list):
for val_dict in vals:
logging.debug("=========== val_dict=%s", val_dict)
val = val_dict["value"]
indices = val_dict["indices"]
setting = self.matching_value(
value, val, sep=sep, indices=indices
)
logging.debug("Setting=%s", setting)
if setting is not None:
if "macros" in setting:
macros = setting.copy()
macros = macros["macros"]
if "extenders" in setting:
extenders = setting.copy()
extenders = extenders["extenders"]
last_dict = True
if isinstance(setting, dict):
for __, tval in setting.items():
if isinstance(tval, dict):
last_dict = False
else:
print(setting)
if not last_dict:
logging.debug(
"------ Call next loop. setting=%s", setting
)
_process_data(
mapped_data,
setting,
indices=indices,
macros=macros,
extenders=extenders,
)
else:
for key2, value2 in setting.items():
logging.debug(
"Setting1 key=%s value=%s indices=%s",
key2,
value2,
indices,
)
if key2.find(sep) > 0:
keys = self.get_nml_value_from_string(
self.nml, key2, indices=indices
)
key2 = keys[0]["value"]
processed = False
logging.debug(
"Setting2 key=%s value=%s indices=%s",
key2,
value2,
indices,
)
if macros is not None:
processed = True
key3, value3 = self.process_macro(
key2, value2, macros, indices=indices
)
if value3.endswith(".dir"):
dir_key = key3 + ".dir"
dir_val = value3
hdr_key = key3 + ".hdr"
hdr_val = value3.replace(".dir", ".hdr")
hdr_key, hdr_val = self.substitute(
hdr_key, hdr_val
)
dir_key, dir_val = self.substitute(
dir_key, dir_val
)
mapped_data.update({hdr_key: hdr_val})
mapped_data.update({dir_key: dir_val})
elif value3.endswith(".nc"):
my_key, my_val = self.substitute(key3, value3)
logging.debug(
"my_key=%s, my_val=%s", my_key, my_val
)
if not my_key.endswith(".nc"):
my_key = my_key + ".nc"
mapped_data.update({my_key: my_val})
else:
my_key, my_val = self.substitute(key3, value3)
mapped_data.update({my_key: my_val})
if extenders is not None:
processed = True
processed_values = self.extend_macro(
key2, value2, extenders
)
for pkey3, pval3 in processed_values.items():
logging.debug(
"pkey3=%s pval3=%s", pkey3, pval3
)
pkey3, pval3 = self.substitute(pkey3, pval3)
logging.debug(
"pkey3=%s pval3=%s", pkey3, pval3
)
mapped_data.update({pkey3: pval3})
if not processed:
pkey3 = key2
pval3 = value2
logging.debug("pkey3=%s pval3=%s", pkey3, pval3)
if pval3.endswith(".nc"):
if not pkey3.endswith(".nc"):
pkey3 = pkey3 + ".nc"
pkey3, pval3 = self.substitute(pkey3, pval3)
mapped_data.update({pkey3: pval3})
indices = None
else:
if key not in ["macros", "extenders"]:
logging.warning(
"Could not match key=%s value=%s", key, val
)
else:
logging.warning("Could not find namelist key=%s", key)
indices = None
mapped_data = {}
_process_data(mapped_data, self.data)
logging.debug("Mapped data=%s", mapped_data)
return mapped_data