Source code for pysurfex.cli

"""Command line interfaces."""
import json
import logging
import os
import sys

import numpy as np
import toml
import yaml

try:
    import matplotlib.pyplot as plt
except ModuleNotFoundError:
    plt = None


from .binary_input import JsonOutputDataFromFile
from .binary_input_legacy import (
    InlineForecastInputData,
    OfflineInputData,
    PgdInputData,
    PrepInputData,
    SodaInputData,
)
from .cache import Cache
from .cmd_parsing import (
    parse_args_bufr2json,
    parse_args_create_forcing,
    parse_args_create_namelist,
    parse_args_dump_environ,
    parse_args_first_guess_for_oi,
    parse_args_gridpp,
    parse_args_hm2pysurfex,
    parse_args_lsm_file_assim,
    parse_args_masterodb,
    parse_args_merge_qc_data,
    parse_args_modify_forcing,
    parse_args_obs2json,
    parse_args_oi2soda,
    parse_args_plot_field,
    parse_args_plot_points,
    parse_args_qc2obsmon,
    parse_args_set_geo_from_obs_set,
    parse_args_set_geo_from_stationlist,
    parse_args_shape2ign,
    parse_args_surfex_binary,
    parse_args_titan,
    parse_cryoclim_pseudoobs,
    parse_sentinel_obs,
    parse_set_domain,
)
from .configuration import (
    ConfigurationFromHarmonieAndConfigFile,
    ConfigurationFromTomlFile,
)
from .datetime_utils import as_datetime, as_datetime_args, as_timedelta
from .file import PGDFile, PREPFile, SURFFile
from .forcing import modify_forcing, run_time_loop, set_forcing_config
from .geo import LonLatVal, get_geo_object, set_domain, shape2ign
from .input_methods import create_obsset_file, get_datasources, set_geo_from_obs_set
from .interpolation import horizontal_oi
from .namelist import NamelistGenerator
from .namelist_legacy import BaseNamelist, Namelist
from .netcdf import (
    create_netcdf_first_guess_template,
    oi2soda,
    read_first_guess_netcdf_file,
    write_analysis_netcdf_file,
)
from .obs import Observation
from .obsmon import write_obsmon_sqlite_file
from .platform_deps import SystemFilePathsFromFile
from .pseudoobs import CryoclimObservationSet, SentinelObservationSet
from .read import ConvertedInput, Converter
from .run import BatchJob, Masterodb, PerturbedOffline, SURFEXBinary
from .titan import (
    TitanDataSet,
    dataset_from_file,
    define_quality_control,
    merge_json_qc_data_sets,
)
from .variable import Variable


[docs]def get_geo_and_config_from_cmd(**kwargs): """Get geo and config from cmd.""" if "harmonie" in kwargs and kwargs["harmonie"]: config_exp = None if "config_exp" in kwargs: if kwargs["config_exp"] is not None: config_exp = kwargs["config_exp"] if config_exp is None: config_exp = ( f"{os.path.abspath(os.path.dirname(__file__))}/cfg/config_exp_surfex.toml" ) logging.info("Using default config from: %s", config_exp) config = ConfigurationFromHarmonieAndConfigFile(os.environ, config_exp) geo = config.geo else: if "domain" in kwargs: domain = kwargs["domain"] if os.path.exists(domain): with open(domain, mode="r", encoding="utf-8") as fhandler: geo = get_geo_object(json.load(fhandler)) else: raise FileNotFoundError(domain) else: geo = None if "config" in kwargs: config = kwargs["config"] if os.path.exists(config): config = ConfigurationFromTomlFile(config) else: raise FileNotFoundError("File not found: " + config) else: config = None return config, geo
[docs]def run_first_guess_for_oi(**kwargs): """Run first guess for oi.""" config, geo = get_geo_and_config_from_cmd(**kwargs) config_file = kwargs["input_config"] if not os.path.exists(config_file): raise FileNotFoundError(config_file) if "output" in kwargs: output = kwargs["output"] else: raise RuntimeError("No output file provided") dtg = kwargs["dtg"] validtime = as_datetime(dtg) variables = kwargs["variables"] variables = variables + ["altitude", "land_area_fraction"] cache = Cache(3600) f_g = None for var in variables: inputfile = kwargs.get("inputfile") fileformat = kwargs.get("inputformat") logging.debug("inputfile: %s", inputfile) logging.debug("fileformat: %s", fileformat) converter = "none" if var == "air_temperature_2m": if "t2m_file" in kwargs and kwargs["t2m_file"] is not None: inputfile = kwargs["t2m_file"] if "t2m_format" in kwargs and kwargs["t2m_format"] is not None: fileformat = kwargs["t2m_format"] if "t2m_converter" in kwargs and kwargs["t2m_converter"] is not None: converter = kwargs["t2m_converter"] elif var == "relative_humidity_2m": if "rh2m_file" in kwargs and kwargs["rh2m_file"] is not None: inputfile = kwargs["rh2m_file"] if "rh2m_format" in kwargs and kwargs["rh2m_format"] is not None: fileformat = kwargs["rh2m_format"] if "rh2m_converter" in kwargs and kwargs["rh2m_converter"] is not None: converter = kwargs["rh2m_converter"] elif var == "surface_snow_thickness": if "sd_file" in kwargs and kwargs["sd_file"] is not None: inputfile = kwargs["sd_file"] if "sd_format" in kwargs and kwargs["sd_format"] is not None: fileformat = kwargs["sd_format"] if "sd_converter" in kwargs and kwargs["sd_converter"] is not None: converter = kwargs["sd_converter"] elif var == "sea_ice_thickness": if "icetk_file" in kwargs and kwargs["icetk_file"] is not None: inputfile = kwargs["icetk_file"] if "icetk_format" in kwargs and kwargs["icetk_format"] is not None: fileformat = kwargs["icetk_format"] if "icetk_converter" in kwargs and kwargs["icetk_converter"] is not None: converter = kwargs["icetk_converter"] elif var == "cloud_base": if "cb_file" in kwargs and kwargs["cb_file"] is not None: inputfile = kwargs["cb_file"] if "cb_format" in kwargs and kwargs["cb_format"] is not None: fileformat = kwargs["cb_format"] if "cb_converter" in kwargs and kwargs["cb_converter"] is not None: converter = kwargs["cb_converter"] elif var == "surface_soil_moisture": if "sm_file" in kwargs and kwargs["sm_file"] is not None: inputfile = kwargs["sm_file"] if "sm_format" in kwargs and kwargs["sm_format"] is not None: fileformat = kwargs["sm_format"] if "sm_converter" in kwargs and kwargs["sm_converter"] is not None: converter = kwargs["sm_converter"] elif var == "altitude": if "altitude_file" in kwargs and kwargs["altitude_file"] is not None: inputfile = kwargs["altitude_file"] if "altitude_format" in kwargs and kwargs["altitude_format"] is not None: fileformat = kwargs["altitude_format"] if ( "altitude_converter" in kwargs and kwargs["altitude_converter"] is not None ): converter = kwargs["altitude_converter"] elif var == "land_area_fraction": if "laf_file" in kwargs and kwargs["laf_file"] is not None: inputfile = kwargs["laf_file"] if "laf_format" in kwargs and kwargs["laf_format"] is not None: fileformat = kwargs["laf_format"] if "laf_converter" in kwargs and kwargs["laf_converter"] is not None: converter = kwargs["laf_converter"] else: raise NotImplementedError("Variable not implemented " + var) if inputfile is None: raise RuntimeError("You must set input file") if fileformat is None: raise RuntimeError("You must set file format") logging.debug(inputfile) logging.debug(fileformat) logging.debug(converter) config = yaml.safe_load(open(config_file, "r", encoding="utf-8")) defs = config[fileformat] defs.update({"filepattern": inputfile}) logging.debug("Variable: %s", var) logging.debug("Fileformat: %s", fileformat) converter_conf = config[var][fileformat]["converter"] if converter not in config[var][fileformat]["converter"]: logging.debug("config_file: %s", config_file) logging.debug("config: %s", config) raise RuntimeError(f"No converter {converter} definition found in {config}!") initial_basetime = validtime - as_timedelta(seconds=10800) converter = Converter( converter, initial_basetime, defs, converter_conf, fileformat ) field = ConvertedInput(geo, var, converter).read_time_step(validtime, cache) field = np.reshape(field, [geo.nlons, geo.nlats]) # Create file if f_g is None: n_x = geo.nlons n_y = geo.nlats f_g = create_netcdf_first_guess_template(variables, n_x, n_y, output, geo=geo) epoch = float( (validtime - as_datetime_args(year=1970, month=1, day=1)).total_seconds() ) f_g.variables["time"][:] = epoch f_g.variables["longitude"][:] = np.transpose(geo.lons) f_g.variables["latitude"][:] = np.transpose(geo.lats) f_g.variables["x"][:] = list(range(0, n_x)) f_g.variables["y"][:] = list(range(0, n_y)) if var == "altitude": field[field < 0] = 0 if np.isnan(np.sum(field)): fill_nan_value = f_g.variables[var].getncattr("_FillValue") logging.info("Field %s got Nan. Fill with: %s", var, str(fill_nan_value)) field[np.where(np.isnan(field))] = fill_nan_value f_g.variables[var][:] = np.transpose(field) if f_g is not None: f_g.close()
[docs]def run_masterodb(**kwargs): """Run masterodb.""" logging.debug("ARGS: %s", kwargs) config, geo = get_geo_and_config_from_cmd(**kwargs) if "config" in kwargs: del kwargs["config"] system_file_paths = kwargs["system_file_paths"] if os.path.exists(system_file_paths): system_file_paths = SystemFilePathsFromFile(system_file_paths) else: raise FileNotFoundError("File not found: " + system_file_paths) del kwargs["system_file_paths"] binary = kwargs["binary"] rte = kwargs["rte"] wrapper = kwargs["wrapper"] namelist_path = kwargs["namelist_path"] force = kwargs["force"] mode = kwargs["mode"] output = kwargs["output"] archive = kwargs["archive"] only_archive = kwargs["only_archive"] print_namelist = kwargs["print_namelist"] check_existence = True if "tolerate_missing" in kwargs: if kwargs["tolerate_missing"]: check_existence = False dtg = None if "dtg" in kwargs: if kwargs["dtg"] is not None and isinstance(kwargs["dtg"], str): dtg = as_datetime(kwargs["dtg"]) kwargs.update({"dtg": dtg}) if dtg is not None: config.update_setting("SURFEX#SODA#HH", f"{dtg.hour:02d}") try: basetime = kwargs["basetime"] if isinstance(basetime, str): basetime = as_datetime(basetime) except KeyError: basetime = None try: input_binary_data = kwargs["input_binary_data"] except KeyError: input_binary_data = None # TODO perturbed_file_pattern = None if perturbed_file_pattern in kwargs: perturbed_file_pattern = kwargs["perturbed_file_pattern"] pgd_file_path = kwargs["pgd"] prep_file_path = kwargs["prep"] if os.path.exists(rte): with open(rte, mode="r", encoding="utf-8") as file_handler: rte = json.load(file_handler) my_batch = BatchJob(rte, wrapper=wrapper) else: raise FileNotFoundError my_archive = None if archive is not None: if os.path.exists(archive): my_archive = JsonOutputDataFromFile(archive) else: raise FileNotFoundError if mode == "forecast": mode = "offline" elif mode == "canari": mode = "soda" else: raise NotImplementedError(mode + " is not implemented!") if os.path.isfile(namelist_path): with open(namelist_path, mode="r", encoding="utf-8") as file_handler: nam_defs = yaml.safe_load(file_handler) nam_gen = NamelistGenerator(mode, config, nam_defs) my_settings = nam_gen.nml if input_binary_data is None: raise RuntimeError("input_binary_data not set") with open(input_binary_data, mode="r", encoding="utf-8") as file_handler: input_binary_data = json.load(file_handler) input_data = nam_gen.input_data_from_namelist( input_binary_data, system_file_paths, validtime=dtg, basetime=basetime ) elif os.path.isdir(namelist_path): if mode == "offline": input_data = InlineForecastInputData( config, system_file_paths, check_existence=check_existence ) elif mode == "soda": input_data = SodaInputData( config, system_file_paths, check_existence=check_existence, perturbed_file_pattern=perturbed_file_pattern, dtg=dtg, ) else: raise NotImplementedError(mode + " is not implemented!") blocks = False if blocks: my_settings = Namelist( mode, config, namelist_path, dtg=dtg, fcint=3 ).get_namelist() else: my_settings = BaseNamelist( mode, config, namelist_path, dtg=dtg, fcint=3 ).get_namelist() geo.update_namelist(my_settings) else: raise NotImplementedError # Create input my_format = my_settings["nam_io_offline"]["csurf_filetype"] my_pgdfile = my_settings["nam_io_offline"]["cpgdfile"] my_prepfile = my_settings["nam_io_offline"]["cprepfile"] my_surffile = my_settings["nam_io_offline"]["csurffile"] lfagmap = False if "lfagmap" in my_settings["nam_io_offline"]: lfagmap = my_settings["nam_io_offline"]["lfagmap"] logging.debug("%s %s", my_pgdfile, lfagmap) # Not run binary masterodb = None if not only_archive: # Normal dry or wet run exists = False if output is not None: exists = os.path.exists(output) if not exists or force: if binary is None: my_batch = None my_pgdfile = PGDFile( my_format, my_pgdfile, input_file=pgd_file_path, lfagmap=lfagmap, masterodb=True, ) my_prepfile = PREPFile( my_format, my_prepfile, input_file=prep_file_path, lfagmap=lfagmap, masterodb=True, ) surffile = SURFFile( my_format, my_surffile, archive_file=output, lfagmap=lfagmap, masterodb=True, ) masterodb = Masterodb( my_pgdfile, my_prepfile, surffile, my_settings, input_data, binary=binary, print_namelist=print_namelist, batch=my_batch, archive_data=my_archive, ) else: logging.info("%s already exists!", output) if archive is not None: if masterodb is not None: masterodb.archive_output() else: logging.info("Masterodb is None")
[docs]def run_surfex_binary(mode, **kwargs): """Run a surfex binary.""" logging.debug("ARGS: %s", kwargs) config, geo = get_geo_and_config_from_cmd(**kwargs) system_file_paths = kwargs["system_file_paths"] if os.path.exists(system_file_paths): system_file_paths = SystemFilePathsFromFile(system_file_paths) else: raise FileNotFoundError("File not found: " + system_file_paths) if "forcing_dir" in kwargs: system_file_paths.add_system_file_path("forcing_dir", kwargs["forcing_dir"]) pgd = False prep = False perturbed = False need_pgd = True need_prep = True check_existence = True if "tolerate_missing" in kwargs: if kwargs["tolerate_missing"]: check_existence = False prep_input_file = None if "prep_file" in kwargs: prep_input_file = kwargs["prep_file"] config.update_setting("SURFEX#PREP#FILE_WITH_PATH", prep_input_file) basename = prep_input_file if basename is not None: basename = os.path.basename(basename) config.update_setting("SURFEX#PREP#FILE", basename) prep_input_pgdfile = None if "prep_pgdfile" in kwargs: prep_input_pgdfile = kwargs["prep_pgdfile"] config.update_setting("SURFEX#PREP#PGDFILE_WITH_PATH", prep_input_pgdfile) basename = prep_input_pgdfile if basename is not None: basename = os.path.basename(basename) config.update_setting("SURFEX#PREP#FILEPGD", basename) prep_input_filetype = None if "prep_filetype" in kwargs: prep_input_filetype = kwargs["prep_filetype"] config.update_setting("SURFEX#PREP#FILETYPE", prep_input_filetype) prep_input_pgdfiletype = None if "prep_pgdfiletype" in kwargs: prep_input_pgdfiletype = kwargs["prep_pgdfiletype"] config.update_setting("SURFEX#PREP#FILEPGDTYPE", prep_input_pgdfiletype) # TODO perturbed_file_pattern = None if perturbed_file_pattern in kwargs: perturbed_file_pattern = kwargs["perturbed_file_pattern"] dtg = None if "dtg" in kwargs: if kwargs["dtg"] is not None and isinstance(kwargs["dtg"], str): dtg = as_datetime(kwargs["dtg"]) kwargs.update({"dtg": dtg}) if dtg is not None: config.update_setting("SURFEX#PREP#NYEAR", dtg.year) config.update_setting("SURFEX#PREP#NMONTH", dtg.month) config.update_setting("SURFEX#PREP#NDAY", dtg.day) xtime = (dtg - dtg.replace(hour=0, second=0, microsecond=0)).total_seconds() config.update_setting("SURFEX#PREP#XTIME", xtime) config.update_setting("SURFEX#SODA#HH", f"{dtg.hour:02d}") logging.debug("kwargs: %s", str(kwargs)) binary = kwargs["binary"] rte = kwargs["rte"] wrapper = kwargs["wrapper"] namelist_path = kwargs["namelist_path"] force = kwargs["force"] output = kwargs["output"] archive = kwargs["archive"] print_namelist = kwargs["print_namelist"] masterodb = kwargs["masterodb"] logging.debug("masterodb %s", masterodb) forc_zs = False if "forc_zs" in kwargs: forc_zs = kwargs["forc_zs"] if mode == "pgd": pgd = True need_pgd = False need_prep = False elif mode == "prep": prep = True need_prep = False elif mode == "offline": pass elif mode == "soda": pass elif mode == "perturbed": perturbed = True else: raise NotImplementedError(mode + " is not implemented!") pgd_file_path = None if need_pgd: pgd_file_path = kwargs["pgd"] prep_file_path = None if need_prep: prep_file_path = kwargs["prep"] pert = None if "pert" in kwargs: pert = kwargs["pert"] negpert = False if "negpert" in kwargs: negpert = kwargs["negpert"] try: basetime = kwargs["basetime"] if isinstance(basetime, str): basetime = as_datetime(basetime) except KeyError: basetime = None try: input_binary_data = kwargs["input_binary_data"] except KeyError: input_binary_data = None if os.path.exists(rte): with open(rte, mode="r", encoding="utf-8") as file_handler: rte = json.load(file_handler) my_batch = BatchJob(rte, wrapper=wrapper) else: raise FileNotFoundError("File not found: " + rte) my_archive = None if archive is not None: if os.path.exists(archive): my_archive = JsonOutputDataFromFile(archive) else: raise FileNotFoundError("File not found: " + archive) if not (output is not None and os.path.exists(output)) or force: if os.path.isfile(namelist_path): with open(namelist_path, mode="r", encoding="utf-8") as file_handler: nam_defs = yaml.safe_load(file_handler) nam_gen = NamelistGenerator(mode, config, nam_defs) my_settings = nam_gen.nml if mode == "pgd": my_settings = geo.update_namelist(my_settings) if input_binary_data is None: raise RuntimeError("input_binary_data not set") with open(input_binary_data, mode="r", encoding="utf-8") as file_handler: input_binary_data = json.load(file_handler) input_data = nam_gen.input_data_from_namelist( input_binary_data, system_file_paths, validtime=dtg, basetime=basetime ) elif os.path.isdir(namelist_path): if mode == "pgd": pgd = True need_pgd = False need_prep = False input_data = PgdInputData( config, system_file_paths, check_existence=check_existence ) elif mode == "prep": prep = True need_prep = False input_data = PrepInputData( config, system_file_paths, check_existence=check_existence, prep_file=prep_input_file, prep_pgdfile=prep_input_pgdfile, ) elif mode == "offline": input_data = OfflineInputData( config, system_file_paths, check_existence=check_existence ) elif mode == "soda": input_data = SodaInputData( config, system_file_paths, check_existence=check_existence, masterodb=kwargs["masterodb"], perturbed_file_pattern=perturbed_file_pattern, dtg=dtg, ) elif mode == "perturbed": perturbed = True input_data = OfflineInputData( config, system_file_paths, check_existence=check_existence ) else: raise NotImplementedError(mode + " is not implemented!") blocks = False if blocks: my_settings = Namelist( mode, config, namelist_path, forc_zs=forc_zs, prep_file=prep_input_file, prep_filetype=prep_input_filetype, prep_pgdfile=prep_input_pgdfile, prep_pgdfiletype=prep_input_pgdfiletype, dtg=dtg, fcint=3, ).get_namelist() else: my_settings = BaseNamelist( mode, config, namelist_path, forc_zs=forc_zs, prep_file=prep_input_file, prep_filetype=prep_input_filetype, prep_pgdfile=prep_input_pgdfile, prep_pgdfiletype=prep_input_pgdfiletype, dtg=dtg, fcint=3, ).get_namelist() geo.update_namelist(my_settings) else: raise NotImplementedError # Create input my_format = my_settings["nam_io_offline"]["csurf_filetype"] my_pgdfile = my_settings["nam_io_offline"]["cpgdfile"] my_prepfile = my_settings["nam_io_offline"]["cprepfile"] my_surffile = my_settings["nam_io_offline"]["csurffile"] lfagmap = False if "lfagmap" in my_settings["nam_io_offline"]: lfagmap = my_settings["nam_io_offline"]["lfagmap"] logging.debug("pgdfile=%s lfagmap=%s %s", my_pgdfile, lfagmap, pgd_file_path) logging.debug("nam_io_offline=%s", my_settings["nam_io_offline"]) if need_pgd: logging.debug("Need pgd") my_pgdfile = PGDFile( my_format, my_pgdfile, input_file=pgd_file_path, lfagmap=lfagmap, masterodb=masterodb, ) if need_prep: logging.debug("Need prep") my_prepfile = PREPFile( my_format, my_prepfile, input_file=prep_file_path, lfagmap=lfagmap, masterodb=masterodb, ) surffile = None if need_prep and need_pgd: logging.debug("Need pgd and prep") surffile = SURFFile( my_format, my_surffile, archive_file=output, lfagmap=lfagmap, masterodb=masterodb, ) if perturbed: PerturbedOffline( binary, my_batch, my_prepfile, pert, my_settings, input_data, pgdfile=my_pgdfile, surfout=surffile, archive_data=my_archive, print_namelist=print_namelist, negpert=negpert, ) elif pgd: my_pgdfile = PGDFile( my_format, my_pgdfile, input_file=pgd_file_path, archive_file=output, lfagmap=lfagmap, masterodb=masterodb, ) SURFEXBinary( binary, my_batch, my_pgdfile, my_settings, input_data, archive_data=my_archive, print_namelist=print_namelist, ) elif prep: my_prepfile = PREPFile( my_format, my_prepfile, archive_file=output, lfagmap=lfagmap, masterodb=masterodb, ) SURFEXBinary( binary, my_batch, my_prepfile, my_settings, input_data, pgdfile=my_pgdfile, archive_data=my_archive, print_namelist=print_namelist, ) else: SURFEXBinary( binary, my_batch, my_prepfile, my_settings, input_data, pgdfile=my_pgdfile, surfout=surffile, archive_data=my_archive, print_namelist=print_namelist, ) else: logging.info("%s already exists!", output)
[docs]def run_gridpp(**kwargs): """Gridpp.""" var = kwargs["var"] input_file = kwargs["input_file"] output_file = None if "output_file" in kwargs: output_file = kwargs["output_file"] hlength = kwargs["hlength"] vlength = 100000 if "vlength" in kwargs: vlength = kwargs["vlength"] wlength = 0 if "wlength" in kwargs: wlength = kwargs["wlength"] max_locations = 20 if "max_locations" in kwargs: max_locations = kwargs["max_locations"] elev_gradient = 0 if "elev_gradient" in kwargs: elev_gradient = kwargs["elev_gradient"] if elev_gradient != -0.0065 and elev_gradient != 0: raise RuntimeError("Not a valid elevation gradient") epsilon = None if "epsilon" in kwargs: epsilon = kwargs["epsilon"] minvalue = None if "minvalue" in kwargs: minvalue = kwargs["minvalue"] maxvalue = None if "maxvalue" in kwargs: maxvalue = kwargs["maxvalue"] only_diff = False if "only_diff" in kwargs: only_diff = kwargs["only_diff"] obs_file = kwargs["obs_file"] # Get input fields geo, validtime, background, glafs, gelevs = read_first_guess_netcdf_file( input_file, var ) an_time = validtime # Read OK observations observations = dataset_from_file(an_time, obs_file, qc_flag=0) logging.info("Found %s observations with QC flag == 0", str(len(observations.lons))) field = horizontal_oi( geo, background, observations, gelevs, hlength=hlength, vlength=vlength, wlength=wlength, structure_function="Barnes", max_locations=max_locations, elev_gradient=elev_gradient, epsilon=epsilon, minvalue=minvalue, maxvalue=maxvalue, interpol="bilinear", only_diff=only_diff, ) if output_file is not None: write_analysis_netcdf_file( output_file, field, var, validtime, gelevs, glafs, new_file=True, geo=geo )
[docs]def run_titan(**kwargs): """Titan.""" __, domain_geo = get_geo_and_config_from_cmd(**kwargs) if "domain_geo" in kwargs: if domain_geo is not None: logging.info("Override domain with domain_geo") with open(kwargs["domain_geo"], mode="r", encoding="utf-8") as fhandler: domain_geo = json.load(fhandler) blacklist = None if "blacklist" in kwargs: blacklist = kwargs["blacklist"] elif "blacklist_file" in kwargs: if kwargs["blacklist_file"] is not None: blacklist = json.load(open(kwargs["blacklist_file"], "r", encoding="utf-8")) if "input_file" in kwargs: input_file = kwargs["input_file"] if os.path.exists(input_file): settings = json.load(open(input_file, "r", encoding="utf-8")) else: raise FileNotFoundError("Could not find input file " + input_file) else: if "input_data" in kwargs: settings = kwargs["input_data"] else: raise RuntimeError("You must specify input_file or input_data") tests = kwargs["tests"] output_file = None if "output_file" in kwargs: output_file = kwargs["output_file"] indent = None if "indent" in kwargs: indent = kwargs["indent"] an_time = kwargs["dtg"] if isinstance(an_time, str): an_time = as_datetime(an_time) var = kwargs["variable"] tests = define_quality_control( tests, settings[var], an_time, domain_geo=domain_geo, blacklist=blacklist ) logging.debug("Settings: %s", settings) datasources = get_datasources(an_time, settings[var]["sets"]) data_set = TitanDataSet(var, settings[var], tests, datasources, an_time) data_set.perform_tests() if output_file is not None: data_set.write_output(output_file, indent=indent)
[docs]def run_oi2soda(**kwargs): """Oi2soda.""" t2m_file = kwargs["t2m_file"] rh2m_file = kwargs["rh2m_file"] sd_file = kwargs["sd_file"] sm_file = kwargs["sm_file"] output = kwargs["output"] t2m = None if t2m_file is not None: t2m = {"file": t2m_file, "var": kwargs["t2m_var"]} rh2m = None if rh2m_file is not None: rh2m = {"file": rh2m_file, "var": kwargs["rh2m_var"]} s_d = None if sd_file is not None: s_d = {"file": sd_file, "var": kwargs["sd_var"]} s_m = None if sm_file is not None: s_m = {"file": sm_file, "var": kwargs["sm_var"]} dtg = as_datetime(kwargs["dtg"]) oi2soda(dtg, t2m=t2m, rh2m=rh2m, s_d=s_d, s_m=s_m, output=output)
[docs]def run_hm2pysurfex(**kwargs): """Harmonie to pysurfex.""" pysurfex_config = kwargs["config"] output = None if "output" in kwargs: output = kwargs["output"] environment = os.environ if "environment" in kwargs: environment_file = kwargs["environment"] environment.update(json.load(open(environment_file, "r", encoding="utf-8"))) # Create configuration config = ConfigurationFromHarmonieAndConfigFile(environment, pysurfex_config) if output is None: logging.info("Config settings %s", config.settings) else: with open(output, "w", encoding="utf-8") as fhandler: toml.dump(config.settings, fhandler)
[docs]def set_geo_from_stationlist(**kwargs): """Set geometry from station list.""" stationlist = kwargs["stationlist"] lonrange = None if "lonrange" in kwargs: lonrange = kwargs["lonrange"] latrange = None if "latrange" in kwargs: latrange = kwargs["latrange"] if lonrange is None: lonrange = [-180, 180] if latrange is None: latrange = [-90, 90] lons = [] lats = [] if os.path.exists(stationlist): stids = json.load(open(stationlist, "r", encoding="utf-8")) else: raise FileNotFoundError("Station list does not exist!") for stid in stids: lon, lat = Observation.get_pos_from_stid(stationlist, [stid]) lon = lon[0] lat = lat[0] if lonrange[0] <= lon <= lonrange[1] and latrange[0] <= lat <= latrange[1]: lon = round(lon, 5) lat = round(lat, 5) lons.append(lon) lats.append(lat) d_x = ["0.3"] * len(lons) geo_json = { "nam_pgd_grid": {"cgrid": "LONLATVAL"}, "nam_lonlatval": {"xx": lons, "xy": lats, "xdx": d_x, "xdy": d_x}, } return LonLatVal(geo_json)
[docs]def sentinel_obs(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_sentinel_obs(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ sentinel_obs ******************") fg_file = kwargs["fg_file"] infiles = kwargs["infiles"] step = kwargs["thinning"] output = kwargs["output"] varname = kwargs["varname"] indent = kwargs["indent"] fg_geo, validtime, grid_sm_fg, __, __ = read_first_guess_netcdf_file(fg_file, varname) obs_set = SentinelObservationSet(infiles, validtime, fg_geo, grid_sm_fg, step=step) obs_set.write_json_file(output, indent=indent)
[docs]def qc2obsmon(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_qc2obsmon(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ qc2obsmon ******************") write_obsmon_sqlite_file(**kwargs)
[docs]def prep(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_surfex_binary(argv, "prep") debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ prep ******************") run_surfex_binary("prep", **kwargs)
[docs]def plot_points(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. Raises: NotImplementedError: "Inputype is not implemented" RuntimeError: "No geo is set" RuntimeError: "No field read" ModuleNotFoundError: "Matplotlib is needed to plot" """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_plot_points(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ plot_points ******************") validtime = None if kwargs["variable"]["validtime"] is not None: validtime = as_datetime(kwargs["variable"]["validtime"]) output = kwargs["output"] geo_file = None if "geo" in kwargs: geo_file = kwargs["geo"] geo = None if geo_file is not None: domain_json = json.load(open(geo_file, "r", encoding="utf-8")) geo = get_geo_object(domain_json) contour = True if "no_contour" in kwargs: no_contour = kwargs["no_contour"] if no_contour: contour = False var_dict = kwargs["variable"] inputtype = kwargs["variable"]["inputtype"] defs = {"var": {inputtype: {"converter": {"none": var_dict}}}} converter_conf = defs["var"][inputtype]["converter"] inputtype = kwargs["variable"]["inputtype"] var = Variable(inputtype, kwargs["variable"], validtime) cache = Cache(-1) converter = "none" if inputtype == "obs": if geo is None: obs_input_type = kwargs["variable"]["filetype"] variable = kwargs["variable"]["variable"] obs_time = as_datetime(kwargs["variable"]["validtime"]) inputfile = kwargs["variable"]["inputfile"] geo = set_geo_from_obs_set( obs_time, obs_input_type, variable, inputfile, lonrange=None, latrange=None, ) converter = Converter(converter, validtime, defs, converter_conf, inputtype) field = ConvertedInput(geo, "var", converter).read_time_step(validtime, cache) if field is None: raise RuntimeError("No field read") if inputtype == "grib1" or inputtype == "grib2": title = f"{inputtype}: {var.file_var.generate_grib_id()} {validtime.strftime('%Y%m%d%H')}" elif inputtype == "netcdf": title = "netcdf: " + var.file_var.name + " " + validtime.strftime("%Y%m%d%H") elif inputtype == "surfex": title = inputtype + ": " + var.file_var.print_var() elif inputtype == "obs": obs_input_type = kwargs["variable"]["filetype"] variable = kwargs["variable"]["variable"] title = inputtype + ": var=" + variable + " type=" + obs_input_type else: raise NotImplementedError("Inputype is not implemented") if geo is None: raise RuntimeError("No geo is set") if field is None: raise RuntimeError("No field read") logging.debug( "npoints=%s nlons=%s nlats=%s contour=%s field.shape=%s", geo.npoints, geo.nlons, geo.nlats, contour, field.shape, ) if geo.npoints != geo.nlons and geo.npoints != geo.nlats: if contour: field = np.reshape(field, [geo.nlons, geo.nlats]) else: contour = False if plt is None: raise ModuleNotFoundError("Matplotlib is needed to plot") logging.debug( "lons.shape=%s lats.shape=%s field.shape=%s", geo.lons.shape, geo.lats.shape, field.shape, ) if contour: plt.contourf(geo.lons, geo.lats, field) else: plt.scatter(geo.lonlist, geo.latlist, c=field) plt.title(title) plt.colorbar() if output is None: plt.show() else: logging.info("Saving figure in %s", output) plt.savefig(output)
[docs]def plot_field(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. Raises: NotImplementedError: "Inputype is not implemented" RuntimeError: "No geo is set" RuntimeError: "No field read" ModuleNotFoundError: "Matplotlib is needed to plot" """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_plot_field(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ plot_field ******************") validtime = None if kwargs["validtime"] is not None: validtime = as_datetime(kwargs["validtime"]) output = kwargs["output"] geo_file = None if "geo" in kwargs: geo_file = kwargs["geo"] geo = None if geo_file is not None: domain_json = json.load(open(geo_file, "r", encoding="utf-8")) geo = get_geo_object(domain_json) contour = True if "no_contour" in kwargs: no_contour = kwargs["no_contour"] if no_contour: contour = False inputtype = kwargs["variable"]["inputtype"] var = Variable(inputtype, kwargs["variable"], validtime) field, geo = var.read_var_field(validtime) if inputtype == "grib1" or inputtype == "grib2": title = f"{inputtype}: {var.file_var.generate_grib_id()} {validtime.strftime('%Y%m%d%H')}" elif inputtype == "netcdf": title = "netcdf: " + var.file_var.var_name + " " + validtime.strftime("%Y%m%d%H") elif inputtype == "surfex": title = inputtype + ": " + var.file_var.print_var() else: raise NotImplementedError("Inputype is not implemented") if geo is None: raise RuntimeError("No geo is set") if field is None: raise RuntimeError("No field read") logging.debug( "npoints=%s nlons=%s nlats=%s contour=%s field.shape=%s", geo.npoints, geo.nlons, geo.nlats, contour, field.shape, ) if geo.npoints != geo.nlons and geo.npoints != geo.nlats: if contour: field = np.reshape(field, [geo.nlons, geo.nlats]) else: contour = False if plt is None: raise ModuleNotFoundError("Matplotlib is needed to plot") logging.debug( "lons.shape=%s lats.shape=%s field.shape=%s", geo.lons.shape, geo.lats.shape, field.shape, ) if contour: plt.contourf(geo.lons, geo.lats, field) else: plt.scatter(geo.lonlist, geo.latlist, c=field) plt.title(title) plt.colorbar() if output is None: plt.show() else: logging.info("Saving figure in %s", output) plt.savefig(output)
[docs]def pgd(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_surfex_binary(argv, "pgd") debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ pgd ******************") run_surfex_binary("pgd", **kwargs)
[docs]def perturbed_offline(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_surfex_binary(argv, "perturbed") debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ offline ******************") run_surfex_binary("perturbed", **kwargs)
[docs]def offline(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_surfex_binary(argv, "offline") debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ offline ******************") run_surfex_binary("offline", **kwargs)
[docs]def cli_oi2soda(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_oi2soda(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ oi2soda ******************") run_oi2soda(**kwargs)
[docs]def cli_modify_forcing(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_modify_forcing(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ modify_forcing ******************") modify_forcing(**kwargs)
[docs]def cli_merge_qc_data(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_merge_qc_data(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ merge_qc_data ******************") qc_data = merge_json_qc_data_sets(kwargs.get("validtime"), kwargs.get("filenames")) qc_data.write_output(kwargs.get("output"), indent=kwargs.get("indent"))
[docs]def masterodb(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_masterodb(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ masterodb ******************") run_masterodb(**kwargs)
[docs]def hm2pysurfex(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_hm2pysurfex(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ hm2pysurfex ******************") run_hm2pysurfex(**kwargs)
[docs]def gridpp(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_gridpp(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ gridpp ******************") run_gridpp(**kwargs)
[docs]def dump_environ(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_dump_environ(argv) outputfile = kwargs["outputfile"] with open(outputfile, mode="w", encoding="utf-8") as file_handler: json.dump(os.environ.copy(), file_handler)
[docs]def first_guess_for_oi(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_first_guess_for_oi(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ FirstGuess4gridpp ******************") run_first_guess_for_oi(**kwargs)
[docs]def cryoclim_pseudoobs(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_cryoclim_pseudoobs(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ cryoclim_pseudoobs ******************") fg_file = kwargs["fg"]["inputfile"] infiles = kwargs["infiles"] step = kwargs["thinning"] output = kwargs["output"] varname = kwargs["fg"]["varname"] indent = kwargs["indent"] laf_threshold = kwargs["laf_threshold"] cryo_varname = kwargs["cryo_varname"] fg_geo, validtime, grid_snow_fg, glafs, gelevs = read_first_guess_netcdf_file( fg_file, varname ) grid_perm_snow = None grid_perm_snow_geo = None if "perm_snow" in kwargs: fileformat = kwargs["perm_snow"].get("inputtype") if fileformat is not None: lvalidtime = kwargs["perm_snow"].get("validtime") if lvalidtime is None: lvalidtime = validtime else: lvalidtime = as_datetime(lvalidtime) grid_perm_snow, grid_perm_snow_geo = Variable( fileformat, kwargs["perm_snow"], lvalidtime ).read_var_field(lvalidtime) grid_slope = None grid_slope_geo = None if "slope" in kwargs: fileformat = kwargs["slope"].get("inputtype") if fileformat is not None: lvalidtime = kwargs["slope"].get("validtime") if lvalidtime is None: lvalidtime = validtime else: lvalidtime = as_datetime(lvalidtime) grid_slope, grid_slope_geo = Variable( fileformat, kwargs["slope"], lvalidtime ).read_var_field(lvalidtime) obs_set = CryoclimObservationSet( infiles, validtime, fg_geo, grid_snow_fg, gelevs, perm_snow=grid_perm_snow, perm_snow_geo=grid_perm_snow_geo, slope=grid_slope, slope_geo=grid_slope_geo, step=step, glaf=glafs, laf_threshold=laf_threshold, cryo_varname=cryo_varname, ) obs_set.write_json_file(output, indent=indent)
[docs]def create_namelist(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. Raises: FileNotFoundError: "File not found:" """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_create_namelist(argv) debug = kwargs.get("debug") mode = kwargs.get("mode") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ %s ******************", mode) logging.debug("ARGS: %s", kwargs) config, __ = get_geo_and_config_from_cmd(**kwargs) mode = kwargs.get("mode") system_file_paths = kwargs["system_file_paths"] if os.path.exists(system_file_paths): system_file_paths = SystemFilePathsFromFile(system_file_paths) else: raise FileNotFoundError("File not found: " + system_file_paths) output = kwargs.get("output") if output is None: output = "OPTIONS.nam" namelist_path = kwargs.get("namelist_path") with open(namelist_path, mode="r", encoding="utf-8") as file_handler: nam_defs = yaml.safe_load(file_handler) config.update_setting("SURFEX#PREP#FILE", None) config.update_setting("SURFEX#PREP#FILEPGD", None) config.update_setting("SURFEX#SODA#HH", "00") my_settings = NamelistGenerator(mode, config, nam_defs) if os.path.exists(output): os.remove(output) my_settings.write(output)
[docs]def create_lsm_file_assim(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. Raises: FileNotFoundError: Domain file not found """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_lsm_file_assim(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ create_lsm_fil ******************") domain = kwargs["domain"] logging.debug("domain=%s", domain) if os.path.exists(domain): domain_json = json.load(open(domain, "r", encoding="utf-8")) geo = get_geo_object(domain_json) else: raise FileNotFoundError(domain) validtime = as_datetime(kwargs["dtg"]) # TODO Move to a method outside cli cache = Cache(3600) inputfile = kwargs["file"] fileformat = kwargs["fileformat"] converter = kwargs["converter"] output = kwargs["output"] var = kwargs["var"] defs = { "filepattern": inputfile, "fileformat": fileformat, "filetype": "surf", "fcint": 10800, "offset": 0, } logging.debug("%s %s", var, fileformat) converter_conf = {"none": {"name": var, "varname": var}} var = "LSM" initial_basetime = validtime - as_timedelta(seconds=10800) converter = Converter(converter, initial_basetime, defs, converter_conf, fileformat) field = ConvertedInput(geo, var, converter).read_time_step(validtime, cache) field = np.reshape(field, [geo.nlons, geo.nlats]) field = np.transpose(field) with open(output, mode="w", encoding="utf-8") as file_handler: for lat in range(0, geo.nlats): for lon in range(0, geo.nlons): file_handler.write(str(field[lat, lon]) + "\n")
[docs]def create_forcing(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_create_forcing(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ create_forcing ******************") options, var_objs, att_objs = set_forcing_config(**kwargs) run_time_loop(options, var_objs, att_objs)
[docs]def bufr2json(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_bufr2json(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ bufr2json ******************") logging.warning("This method is depreciated. Please use create_obsset_file") variables = kwargs.get("vars") bufrfile = kwargs.get("bufr") output = kwargs.get("output") valid_dtg = as_datetime(kwargs.get("dtg")) valid_range = as_timedelta(seconds=kwargs.get("valid_range")) sigmao = kwargs.get("sigmao") label = kwargs.get("label") indent = kwargs.get("indent") lonrange = kwargs.get("lonrange") latrange = kwargs.get("latrange") create_obsset_file( valid_dtg, "bufr", variables, bufrfile, output, pos_t_range=valid_range, lonrange=lonrange, latrange=latrange, label=label, indent=indent, sigmao=sigmao, )
[docs]def obs2json(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_obs2json(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ obs2json ******************") obs_type = kwargs.get("obs_type") variables = kwargs.get("vars") inputfile = kwargs.get("inputfile") output = kwargs.get("output") obs_time = as_datetime(kwargs.get("obs_time")) label = kwargs.get("label") unit = kwargs.get("unit") level = kwargs.get("level") obtypes = kwargs.get("obtypes") subtypes = kwargs.get("subtypes") sigmao = kwargs.get("sigmao") pos_t_range = kwargs.get("pos_t_range") neg_t_range = kwargs.get("neg_t_range") indent = kwargs.get("indent") lonrange = kwargs.get("lonrange") latrange = kwargs.get("latrange") if pos_t_range is not None: pos_t_range = as_timedelta(seconds=pos_t_range) if neg_t_range is not None: neg_t_range = as_timedelta(seconds=neg_t_range) create_obsset_file( obs_time, obs_type, variables, inputfile, output, pos_t_range=pos_t_range, neg_t_range=neg_t_range, lonrange=lonrange, latrange=latrange, label=label, unit=unit, level=level, indent=indent, obtypes=obtypes, subtypes=subtypes, sigmao=sigmao, )
[docs]def cli_set_domain(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. Raises: FileNotFoundError: File not found RuntimeError: Domain not provided """ if argv is None: argv = sys.argv[1:] args = parse_set_domain(argv) debug = args.debug if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ set_domain ******************") domain = args.domain domains = args.domains output = args.output indent = args.indent harmonie_mode = args.harmonie if os.path.exists(domains): with open(domains, mode="r", encoding="utf-8") as file_handler: domains = json.load(file_handler) domain_json = set_domain(domains, domain, hm_mode=harmonie_mode) if domain_json is not None: with open(output, mode="w", encoding="utf-8") as file_handler: json.dump(domain_json, file_handler, indent=indent) else: raise RuntimeError("Domain not provided") else: raise FileNotFoundError
[docs]def cli_set_geo_from_obs_set(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_set_geo_from_obs_set(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ set_geo_from_obs_set ******************") validtime = as_datetime(kwargs["validtime"]) geo = set_geo_from_obs_set( validtime, kwargs["obs_type"], kwargs["variable"], kwargs["inputfile"], kwargs["lonrange"], kwargs["latrange"], ) output = kwargs["output"] with open(output, mode="w", encoding="utf-8") as file_handler: json.dump(geo.json, file_handler)
[docs]def cli_set_geo_from_stationlist(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_set_geo_from_stationlist(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ set_geo_from_stationlist ******************") geo = set_geo_from_stationlist(**kwargs) output = kwargs["output"] with open(output, mode="w", encoding="utf-8") as file_handler: json.dump(geo.json, file_handler)
[docs]def cli_shape2ign(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_shape2ign(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ shape2ign ******************") catchment = kwargs.get("catchment") infile = kwargs.get("infile") output = kwargs.get("output") indent = kwargs.get("indent") ref_proj = kwargs.get("ref_proj") shape2ign(catchment, infile, output, ref_proj, indent=indent)
[docs]def soda(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_surfex_binary(argv, "soda") debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ soda ******************") run_surfex_binary("soda", **kwargs)
[docs]def titan(argv=None): """Command line interface. Args: argv(list, optional): Arguments. Defaults to None. """ if argv is None: argv = sys.argv[1:] kwargs = parse_args_titan(argv) debug = kwargs.get("debug") if debug: logging.basicConfig( format="%(asctime)s %(levelname)s %(pathname)s:%(lineno)s %(message)s", level=logging.DEBUG, ) else: logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", level=logging.INFO ) logging.info("************ titan ******************") run_titan(**kwargs)