Source code for pysurfex.input_methods

"""Input methods."""
import glob
import logging
import os

from .bufr import BufrObservationSet
from .datetime_utils import as_timedelta
from .geo import LonLatVal
from .obs import JsonObservationSet, MetFrostObservations, NetatmoObservationSet
from .obsoul import ObservationDataSetFromObsoulFile
from .util import parse_filepattern


[docs]def get_datasources(obs_time, settings): """Get data sources. Main data source interface setting data ObservationSet objects based on settings dictionary Args: obs_time (datetime.datetime): Observation time settings (dict): Settings Raises: NotImplementedError: Unknown observation file format NotImplementedError: Only one file reading implemented RuntimeError: No filenames or filepattern found RuntimeError: You must set variable name RuntimeError: You must set varname to read NETATMO JSON files RuntimeError: You must set variable name Returns: datasources(list): List of observation data sets """ datasources = [] for obs_set in settings: kwargs = {} kwargs.update({"label": obs_set}) if "filetype" in settings[obs_set]: filetype = settings[obs_set]["filetype"] filepattern = None if "filepattern" in settings[obs_set]: filepattern = settings[obs_set]["filepattern"] validtime = obs_time if filetype.lower() == "bufr": if isinstance(filepattern, list): if len(filepattern) > 1: raise NotImplementedError("Only one file reading implemented") filepattern = filepattern[0] filename = parse_filepattern(filepattern, obs_time, validtime) if "varname" in settings[obs_set]: varname = settings[obs_set]["varname"] else: raise RuntimeError("You must set variable name") if "lonrange" in settings[obs_set]: kwargs.update({"lonrange": settings[obs_set]["lonrange"]}) if "latrange" in settings[obs_set]: kwargs.update({"latrange": settings[obs_set]["latrange"]}) if "sigmao" in settings[obs_set]: kwargs.update({"sigmao": settings[obs_set]["sigmao"]}) if "dt" in settings[obs_set]: deltat = settings[obs_set]["dt"] else: deltat = 1800 valid_range = as_timedelta(seconds=deltat) if os.path.exists(filename): datasources.append( BufrObservationSet( filename, varname, obs_time, valid_range, **kwargs ) ) else: logging.warning("WARNING: filename %s not set. Not added.", filename) elif filetype.lower() == "netatmo": filenames = None if "filenames" in settings[obs_set]: filenames = settings[obs_set]["filenames"] if filenames is None: if "filepattern" in settings[obs_set]: filepattern = settings[obs_set]["filepattern"] neg_t_range = 15 if "neg_t_range" in settings[obs_set]: neg_t_range = settings[obs_set]["neg_t_range"] pos_t_range = 15 if "pos_t_range" in settings[obs_set]: pos_t_range = settings[obs_set]["pos_t_range"] if "sigmao" in settings[obs_set]: kwargs.update({"sigmao": settings[obs_set]["sigmao"]}) dtg = validtime - as_timedelta(seconds=int(neg_t_range) * 60) end_dtg = validtime + as_timedelta(seconds=int(pos_t_range) * 60) filenames = [] while dtg < end_dtg: fname = parse_filepattern(filepattern, dtg, dtg) fname = glob.glob(fname) if len(fname) == 1: fname = fname[0] if os.path.exists(fname) and fname not in filenames: filenames.append(fname) dtg = dtg + as_timedelta(seconds=60) else: raise RuntimeError("No filenames or filepattern found") if "varname" in settings[obs_set]: variable = settings[obs_set]["varname"] else: raise RuntimeError("You must set varname to read NETATMO JSON files") if "lonrange" in settings[obs_set]: kwargs.update({"lonrange": settings[obs_set]["lonrange"]}) if "latrange" in settings[obs_set]: kwargs.update({"latrange": settings[obs_set]["latrange"]}) if "sigmao" in settings[obs_set]: kwargs.update({"sigmao": settings[obs_set]["sigmao"]}) if "dt" in settings[obs_set]: kwargs.update({"dt": settings[obs_set]["dt"]}) else: kwargs.update({"dt": 1800}) if filenames is not None: datasources.append( NetatmoObservationSet(filenames, variable, obs_time, **kwargs) ) else: logging.warning("WARNING: filenames not set. Not added.") elif filetype.lower() == "frost": if "varname" in settings[obs_set]: varname = settings[obs_set]["varname"] else: raise RuntimeError("You must set variable name") if "lonrange" in settings[obs_set]: kwargs.update({"lonrange": settings[obs_set]["lonrange"]}) if "latrange" in settings[obs_set]: kwargs.update({"latrange": settings[obs_set]["latrange"]}) if "unit" in settings[obs_set]: kwargs.update({"unit": settings[obs_set]["unit"]}) if "sigmao" in settings[obs_set]: kwargs.update({"sigmao": settings[obs_set]["sigmao"]}) if "level" in settings[obs_set]: kwargs.update({"level": settings[obs_set]["level"]}) kwargs.update({"validtime": obs_time}) datasources.append(MetFrostObservations(varname, **kwargs)) elif filetype.lower() == "obsoul": if isinstance(filepattern, list): if len(filepattern) > 1: raise NotImplementedError("Only one file reading implemented") filepattern = filepattern[0] filename = parse_filepattern(filepattern, obs_time, validtime) obnumber = None neg_dt = None pos_dt = None obtypes = None subtypes = None sigmao = None if "obnumber" in settings[obs_set]: obnumber = int(settings[obs_set]["obnumber"]) if "neg_dt" in settings[obs_set]: neg_dt = int(settings[obs_set]["neg_dt"]) if "pos_dt" in settings[obs_set]: pos_dt = int(settings[obs_set]["pos_dt"]) if "obtypes" in settings[obs_set]: obtypes = settings[obs_set]["obtypes"] if "subtypes" in settings[obs_set]: subtypes = settings[obs_set]["subtypes"] if "sigmao" in settings[obs_set]: sigmao = settings[obs_set]["sigmao"] if os.path.exists(filename): datasources.append( ObservationDataSetFromObsoulFile( filename, an_time=obs_time, neg_dt=neg_dt, pos_dt=pos_dt, obtypes=obtypes, subtypes=subtypes, obnumber=obnumber, sigmao=sigmao, ) ) else: print("WARNING: filename " + filename + " not existing. Not added.") elif filetype.lower() == "json": if isinstance(filepattern, list): if len(filepattern) > 1: raise NotImplementedError("Only one file reading implemented") filepattern = filepattern[0] filename = parse_filepattern(filepattern, obs_time, validtime) varname = None if "varname" in settings[obs_set]: varname = settings[obs_set]["varname"] kwargs.update({"var": varname}) if "sigmao" in settings[obs_set]: kwargs.update({"sigmao": settings[obs_set]["sigmao"]}) if os.path.exists(filename): datasources.append(JsonObservationSet(filename, **kwargs)) else: logging.warning( "WARNING: filename %s not existing. Not added.", filename ) else: raise NotImplementedError("Unknown observation file format") else: logging.info("No file type provided") return datasources
def set_geo_from_obs_set( obs_time, obs_type, varname, inputfile, lonrange=None, latrange=None ): """Set geometry from obs file. Args: obs_time (as_datetime): Observation time obs_type (str): Observation file type varname (str): _Observation variable inputfile (str): Input file with obs set lonrange (tuple, optional): Longitude range (min, max). Defaults to None. latrange (tuple, optional): Latitude range (min, max). Defaults to None. Returns: geo (Geo): Surfex geometry """ settings = { "obs": { "varname": varname, "filetype": obs_type, "inputfile": inputfile, "filepattern": inputfile, } } if lonrange is None: lonrange = [-180, 180] if latrange is None: latrange = [-90, 90] logging.debug("%s", settings) logging.debug("Get data source") __, lons, lats, __, __, __, __, __ = get_datasources(obs_time, settings)[0].get_obs() selected_lons = [] selected_lats = [] for i, lon in enumerate(lons): lat = lats[i] if lonrange[0] <= lon <= lonrange[1] and latrange[0] <= lat <= latrange[1]: lon = round(lon, 5) lat = round(lat, 5) selected_lons.append(lon) selected_lats.append(lat) d_x = ["0.3"] * len(selected_lons) geo_json = { "nam_pgd_grid": {"cgrid": "LONLATVAL"}, "nam_lonlatval": { "xx": selected_lons, "xy": selected_lats, "xdx": d_x, "xdy": d_x, }, } geo = LonLatVal(geo_json) return geo def get_obsset( obs_time, obs_type, varname, inputfile, lonrange=None, latrange=None, label=None, neg_t_range=None, pos_t_range=None, unit=None, level=None, obtypes=None, subtypes=None, sigmao=None, ): """Create an observation set from an input data set. Args: obs_time (as_datetime): Observation time obs_type (str): Observation file type varname (list): _Observation variable(s) inputfile (list): Input file(s) with obs set pos_t_range (int, optional): Time window duration after obs_time in seconds neg_t_range (int, optional): Time window duration after obs_time in seconds lonrange (tuple, optional): Longitude range (min, max). Defaults to None. latrange (tuple, optional): Latitude range (min, max). Defaults to None. label (str, optional): Obs set label. Default to None which means it will be the same as obs_type unit (str, optional): Unit (FROST) level (str, optional): Level (FROST) obtypes (list, optional): Obstypes (obsoul) subtypes (list, optional): Subtypes (obsoul) sigmao (float, optional): Observation error relative to normal background error. Defaults to None. Returns: obsset (ObservationSet): Observation set """ if label is None: label = obs_type if isinstance(varname, str): varname = [varname] if isinstance(inputfile, str): inputfile = [inputfile] if lonrange is None: lonrange = [-180, 180] if latrange is None: latrange = [-90, 90] dt = None if dt is None and pos_t_range is not None: dt = pos_t_range if dt is None and neg_t_range is not None: dt = neg_t_range dt_seconds = dt if dt is not None: dt_seconds = int(dt.total_seconds()) pos_t_range_seconds = pos_t_range if pos_t_range is not None: pos_t_range_seconds = int(pos_t_range.total_seconds()) neg_t_range_seconds = neg_t_range if neg_t_range is not None: neg_t_range_seconds = int(neg_t_range.total_seconds()) settings = { label: { "varname": varname, "filetype": obs_type, "inputfile": inputfile, "filepattern": inputfile, "dt": dt_seconds, "label": label, "lonrange": lonrange, "latrange": latrange, "unit": unit, "level": level, "obtypes": obtypes, "subtypes": subtypes, "sigmao": sigmao, "pos_t_range": pos_t_range_seconds, "neg_t_range": neg_t_range_seconds, } } logging.debug("%s", settings) logging.debug("Get data source") return get_datasources(obs_time, settings)[0] def create_obsset_file( obs_time, obs_type, varname, inputfile, output, lonrange=None, latrange=None, label=None, indent=None, neg_t_range=None, pos_t_range=None, unit=None, level=None, obtypes=None, subtypes=None, sigmao=None, ): """Create an observation set from an input data set. Args: obs_time (as_datetime): Observation time obs_type (str): Observation file type varname (list): _Observation variable(s) inputfile (list): Input file(s) with obs set output (str): Output file pos_t_range (int, optional): Time window duration after obs_time in seconds neg_t_range (int, optional): Time window duration after obs_time in seconds lonrange (tuple, optional): Longitude range (min, max). Defaults to None. latrange (tuple, optional): Latitude range (min, max). Defaults to None. label (str, optional): Obs set label. Default to None which means it will be the same as obs_type indent (int, optional): File indentation. Defaults to None. unit (str, optional): Unit (FROST) level (str, optional): Level (FROST) obtypes (list, optional): Obstypes (obsoul) subtypes (list, optional): Subtypes (obsoul) sigmao (float, optional): Observation error relative to normal background error. Defaults to None. """ logging.debug("Get data source") obsset = get_obsset( obs_time, obs_type, varname, inputfile, lonrange=lonrange, latrange=latrange, label=label, neg_t_range=neg_t_range, pos_t_range=pos_t_range, unit=unit, level=level, obtypes=obtypes, subtypes=subtypes, sigmao=sigmao, ) obsset.write_json_file(output, indent=indent)