Source code for pysurfex.variable

"""Variable."""
import copy
import logging

import numpy as np

from .datetime_utils import as_timedelta
from .fa import Fa
from .file import SurfexFileVariable, get_surfex_io_object
from .geo import get_geo_object
from .grib import Grib, Grib1Variable, Grib2Variable
from .input_methods import get_datasources
from .netcdf import Netcdf, NetCDFReadVariable
from .util import parse_filepattern


[docs]class Variable(object): """New combined variable."""
[docs] def __init__(self, var_type, var_dict, initial_basetime, prefer_forecast=True): """Construct variable. Args: var_type (str): Variable type. var_dict (dict): Variable definitions initial_basetime (datetime): Initial basetime prefer_forecast (bool, optional): Prefer forecasts instead of analysis. Defaults to True. Raises: RuntimeError: No filepattern provided RuntimeError: variable must have attribute RuntimeError: You can not have larger offset than the frequency of forecasts """ self.var_type = var_type self.var_dict = copy.deepcopy(var_dict) try: self.interval = var_dict["timestep"] except KeyError: self.interval = 3600 try: self.filepattern = var_dict["filepattern"] except KeyError: raise RuntimeError("No filepattern provided") from KeyError self.initial_basetime = initial_basetime try: self.fcint = int(self.var_dict["fcint"]) except KeyError: self.fcint = 10800 try: self.offset = int(self.var_dict["offset"]) except KeyError: self.offset = 0 try: self.prefer_forecast = self.var_dict["prefer_forecast"] except KeyError: self.prefer_forecast = prefer_forecast if self.offset > self.fcint: raise RuntimeError( "You can not have larger offset than the frequency of forecasts " + str(self.offset) + " > " + str(self.fcint) ) accumulated, instant, file_var = self.set_var(validtime=self.initial_basetime) self.file_var = file_var self.instant = instant self.accumulated = accumulated logging.debug("Constructed variable for %s", str(self.var_dict))
[docs] def get_filename(self, validtime, previoustime=None): """Get the filename. Args: validtime (datetime.datetime): Valid time. previoustime (datetime.datetime, optional): Previous valid time. Defaults to None. Returns: str: Parsed filename """ logging.debug("Set basename for filename") basetime = None if validtime is not None: basetime = self.get_basetime(validtime, previoustime=previoustime) if previoustime is not None: validtime = previoustime return parse_filepattern(self.filepattern, basetime, validtime)
[docs] def get_filehandler(self, validtime, cache=None, previoustime=None): """Get the file handler. Args: validtime (datetime.datetime): Valid time. cache (surfex.Cache, optional): Cache. Defaults to None. previoustime (datetime.datetime, optional): Previous valid time. Defaults to None. Raises: NotImplementedError: Variable type not implemented Returns: tuple: Filehandler and file name """ filename = self.get_filename(validtime, previoustime=previoustime) if cache is not None and cache.file_open(filename): file_handler = cache.get_file_handler(filename) else: if self.var_type == "netcdf": file_handler = Netcdf(filename) elif self.var_type == "grib1" or self.var_type == "grib2": file_handler = Grib(filename) elif self.var_type == "fa": file_handler = Fa(filename) elif self.var_type == "surfex": try: fileformat = self.var_dict["fileformat"] except KeyError: fileformat = None try: filetype = self.var_dict["filetype"] except KeyError: filetype = None geo_in = None if "geo_input" in self.var_dict: geo_in = self.var_dict["geo_input"] elif "geo_input_file" in self.var_dict: geo_in_file = self.var_dict["geo_input_file"] geo_in = get_geo_object(open(geo_in_file, "r", encoding="utf-8")) file_handler = get_surfex_io_object( filename, fileformat=fileformat, filetype=filetype, geo=geo_in ) elif self.var_type == "obs": var_dict = self.var_dict var_dict = {"set": var_dict} basetime = self.get_basetime(validtime) file_handler = get_datasources(basetime, var_dict)[0] else: raise NotImplementedError if cache is not None: cache.set_file_handler(filename, file_handler) logging.debug("var_type: %s", self.var_type) logging.debug("filename: %s", filename) return file_handler, filename
[docs] def read_var_field(self, validtime, cache=None): """Read points for a variable. Args: validtime (datetime.datetime): Valid time cache (surfex.Cache, optional): Cache. Defaults to None. Returns: numpy.darray: A numpy array with point values for variable Raises: NotImplementedError: "Field not defined for point data" """ logging.debug("set basetime from %s", validtime) filehandler, filename = self.get_filehandler(validtime, cache=cache) id_str = None if cache is not None: id_str = cache.generate_id(self.var_type, self.file_var, filename, validtime) if cache is not None and cache.is_saved(id_str): field = cache.saved_fields[id_str] logging.info("Using cached value for %s", id_str) else: if self.var_type == "obs": raise NotImplementedError("Field not defined for point data") else: field, geo_read = filehandler.field(self.file_var, validtime=validtime) if field is not None: logging.debug("field.shape %s", field.shape) if cache is not None: logging.debug("ID_STRING %s", id_str) cache.save_field(id_str, field) return field, geo_read
[docs] def read_var_points(self, var, geo, validtime, previoustime=None, cache=None): """Read points for a variable. Args: var (object): Variable object geo (surfex.Geo): Surfex geometry validtime (datetime.datetime): Valid time previoustime (datetime.datetime, optional): Previous valid time for accumulated variables. Defaults to None. cache (surfex.Cache, optional): Cache. Defaults to None. Returns: numpy.darray: A numpy array with point values for variable """ interpolation = "bilinear" if self.var_type != "obs": if "interpolator" in self.var_dict: interpolation = self.var_dict["interpolator"] logging.debug("set basetime from %s", validtime) filehandler, filename = self.get_filehandler( validtime, cache=cache, previoustime=previoustime ) if previoustime is not None: validtime = previoustime logging.debug("new validtime to read previous time %s", validtime) if filehandler is None: logging.warning("No file handler exist for this time step") field = np.array([len(geo.lons)]) field = field.fill(np.nan) else: id_str = None if cache is not None: id_str = cache.generate_id(self.var_type, var, filename, validtime) if cache is not None and cache.is_saved(id_str): field = cache.saved_fields[id_str] logging.info("Using cached value for %s", id_str) else: if self.var_type == "obs": __, field, __ = filehandler.points(geo, validtime=validtime) else: field, interpolator = filehandler.points( var, geo, interpolation=interpolation, validtime=validtime ) field = self.rotate_geographic_wind(field, interpolator) if field is not None: logging.debug("field.shape %s", field.shape) if cache is not None: logging.debug("ID_STRING %s", id_str) cache.save_field(id_str, field) return field
[docs] def set_var(self, validtime=None): """Set the variable. Args: validtime (datetime.datetime, optional): Valid time. Defaults to None. Raises: NotImplementedError: Variable not implemented RuntimeError: You must provide name RuntimeError: grib1 needs type or levelType Returns: tuple: accumulated, instant, var """ accumulated = False if self.var_type == "netcdf": try: name = self.var_dict["name"] except KeyError: raise RuntimeError("You must provide name") from KeyError try: accumulated = self.var_dict["accumulated"] except KeyError: accumulated = False try: level = self.var_dict["level"] if not isinstance(level, list): level = [level] except KeyError: level = None try: units = str([self.var_dict["units"]][0]) except KeyError: units = None try: member = self.var_dict["member"] if member is not None: if not isinstance(member, list): member = [member] except KeyError: member = None var = NetCDFReadVariable(name, level=level, units=units, member=member) elif self.var_type == "grib1": try: par = self.var_dict["parameter"] except KeyError: raise RuntimeError("You must provide parameter") from KeyError try: typ = self.var_dict["type"] except KeyError: try: typ = self.var_dict["levelType"] except KeyError: raise RuntimeError("grib1 needs type or levelType") from KeyError try: level = self.var_dict["level"] except KeyError: raise RuntimeError("grib1 needs level") from KeyError try: tri = self.var_dict["tri"] except KeyError: tri = 0 if tri == 4: accumulated = True var = Grib1Variable(par, typ, level, tri=tri) elif self.var_type == "grib2": discipline = self.var_dict["discipline"] p_c = self.var_dict["parameterCategory"] p_n = self.var_dict["parameterNumber"] l_t = self.var_dict["levelType"] lev = self.var_dict["level"] try: tsp = self.var_dict["typeOfStatisticalProcessing"] except KeyError: tsp = -1 if tsp == 1: accumulated = True var = Grib2Variable(discipline, p_c, p_n, l_t, lev, tsp=tsp) elif self.var_type == "surfex": varname = self.var_dict["varname"] layers = None if "layers" in self.var_dict: layers = self.var_dict["layers"] patches = None if "patches" in self.var_dict: patches = self.var_dict["patches"] if "accumulated" in self.var_dict: accumulated = self.var_dict["accumulated"] datatype = "float" if "datatype" in self.var_dict: datatype = self.var_dict["datatype"] tiletype = "FULL" if "tiletype" in self.var_dict: tiletype = self.var_dict["tiletype"] basetime = None if validtime is not None: basetime = self.get_basetime(validtime) var = SurfexFileVariable( varname, validtime=validtime, patches=patches, layers=layers, basetime=basetime, interval=self.interval, datatype=datatype, tiletype=tiletype, ) elif self.var_type == "fa": var = self.var_dict["name"] if "accumulated" in self.var_dict: accumulated = self.var_dict["accumulated"] elif self.var_type == "obs": if "varname" in self.var_dict: var = self.var_dict["varname"] else: var = None else: raise NotImplementedError instant = 0 if accumulated: instant = self.interval if "instant" in self.var_dict: instant = self.var_dict["instant"] if "prefer_forecast" in self.var_dict: self.prefer_forecast = self.var_dict["prefer_forecast"] return accumulated, instant, var
[docs] def read_variable(self, geo, validtime, cache=None): """Read the variable. Args: geo (surfex.Geometry): Geometry to read validtime (datetime.datetime): Valid time. cache (surfex.cache): Cache. Defaults to None Raises: RuntimeError: Negative accumulated value found Returns: np.darray: Field read and interpolated. """ # Set variable info previous_field = None if self.accumulated: # Re-read field previoustime = validtime - as_timedelta(seconds=self.interval) # Don't read if previous time is older than the very first basetime if previoustime >= self.initial_basetime: logging.debug("Re-read %s", previoustime) previous_field = self.read_var_points( self.file_var, geo, validtime=validtime, previoustime=previoustime, cache=cache, ) else: previous_field = np.zeros([geo.npoints]) elif self.instant > 0: previous_field = np.zeros([geo.npoints]) field = self.read_var_points(self.file_var, geo, validtime=validtime, cache=cache) # Deaccumulate if either two files are read or if instant is > 0. if self.accumulated or self.instant > 0: field = self.deaccumulate(field, previous_field, self.instant) if any(field[field < 0.0]): raise RuntimeError( "Negative accumulated value found for " + self.file_var ) return field
[docs] def print_variable_info(self): """Print variable.""" logging.debug(":%s:", str(self.var_dict))
[docs] def deaccumulate(self, field, previous_field, instant): """Deaccumulate variable. Args: field (_type_): _description_ previous_field (_type_): _description_ instant (_type_): _description_ Raises: RuntimeError: "Should not be negative values" Returns: _type_: _description_ """ logging.debug("Deaccumulate field with: %s", instant) if field is None: logging.warning("Field is not read properly") return None else: field = np.subtract(field, previous_field) if any(field[field < 0.0]): neg = [] for i in range(0, field.shape[0]): if field[i] < 0.0: neg.append(field[i]) neg = np.asarray(neg) logging.warning( "Deaccumulated field has %s negative values. lowest: %s mean: %s", str(neg.shape[0]), str(np.nanmin(neg)), str(np.nanmean(neg)), ) field[field < 0.0] = 0 if any(field[field < 0.0]): raise RuntimeError("Should not be negative values") if float(instant) != 0.0: field = np.divide(field, float(instant)) return field
[docs] def get_basetime(self, validtime, previoustime=None, allow_different_basetime=False): """Get the basetime of the file. Args: validtime (datetime.datetime): Valid time previoustime (datetime.datetime, optional): Previous valid time. Defaults to None. allow_different_basetime (bool, optional): Allow different base times. Defaults to False. Raises: RuntimeError: "Negative offset does not make sense here" NotImplementedError: _description_ Returns: datetime.datetime: Base time. """ if self.offset < 0: raise RuntimeError("Negative offset does not make sense here") # Take offset into account first = False offset = self.offset basetime = validtime - as_timedelta(seconds=offset) if basetime <= self.initial_basetime: first = True basetime = self.initial_basetime logging.debug(" validtime: %s", validtime) logging.debug(" previoustime: %s", previoustime) logging.debug(" first: %s", first) logging.debug(" fcint: %s", self.fcint) logging.debug(" offset: %s", self.offset) logging.debug(" initial_basetime: %s", self.initial_basetime) logging.debug(" Basetime with offset: %s", basetime) # Modify based on fcint seconds_since_midnight = int( ( basetime - basetime.replace(hour=0, minute=0, second=0, microsecond=0) ).total_seconds() ) if seconds_since_midnight == 86400: seconds_since_midnight = 0 basetime_inc = int( seconds_since_midnight / int(as_timedelta(seconds=self.fcint).total_seconds()) ) prefer_forecast = as_timedelta(seconds=0) if seconds_since_midnight == basetime_inc * int( as_timedelta(seconds=self.fcint).seconds ): if first: logging.debug("First basetime") else: if self.prefer_forecast: logging.debug("Prefer forecasts instead of analyis") prefer_forecast = as_timedelta(seconds=self.fcint) else: logging.debug("Prefer analysis instead of forecast") fcint = as_timedelta(seconds=self.fcint) basetime = ( basetime.replace(hour=0, minute=0, second=0, microsecond=0) + (basetime_inc * fcint) - prefer_forecast ) if previoustime is not None: if allow_different_basetime: raise NotImplementedError logging.debug("seconds_since_midnight: %s", seconds_since_midnight) logging.debug( " cycle seconds: %s", basetime_inc * int(as_timedelta(seconds=self.fcint).seconds), ) logging.debug(" prefer_forecast: %s", self.prefer_forecast) logging.debug(" prefer_forecast_inc: %s", prefer_forecast) logging.debug(" basetime_inc: %s", basetime_inc) logging.debug(" Basetime is: %s", basetime) return basetime
[docs] def rotate_geographic_wind(self, field, interpolator): """Rotate wind. Args: field (_type_): _description_ interpolator (_type_): _description_ Returns: _type_: _description_ """ rotate_wind = False if "rotate_to_geographic" in self.var_dict: rotate_wind = self.var_dict["rotate_to_geographic"] if rotate_wind: field = interpolator.rotate_wind_to_geographic(field) return field else: return field