"""Netcdf."""
import logging
import os
import re
from enum import Enum
import netCDF4
import numpy as np
try:
import cfunits
except ModuleNotFoundError:
cfunits = None
except AssertionError:
cfunits = None
except: # noqa
cfunits = None
from .datetime_utils import fromtimestamp, isdatetime, offsetaware, utcfromtimestamp
from .geo import ConfProj, Geo
from .interpolation import Interpolation
[docs]class Netcdf(object):
"""Netcdf input."""
[docs] def __init__(self, filename):
"""Construct NetCDF.
Args:
filename (str): Filename
"""
self.filename = filename
logging.debug("filename: %s", filename)
self.file = netCDF4.Dataset(filename, "r")
def nc_slice(
self,
var_name,
levels=None,
members=None,
times=None,
xcoords=None,
ycoords=None,
deaccumulate=False,
instantanious=0.0,
units=None,
lev_from_ind=False,
):
"""Assembles a 5D field in order lon,lat,time,height,ensemble.
Arguments:
var_name (str): Name of field to retrieve
levels (list): Height index. If None, return all.
members (list): Ensemble index. If None, return all.
times (list): Time index. If None, return all.
xcoords: X-axis coordinates to subset
ycoords: Y-axis coordinates to subset
deaccumulate (bool): Deaccumulate field
instantanious (float): Scaling factor to make an accumulated value as instantanius
units (str): CF unit for the variable to be read
lev_from_ind (bool): level list are indices and not values
Raises:
NotImplementedError: Subsetting of the input dimensions not implemented yet!
ValueError: Times must be a list!
ValueError: Levels must be a list!
ValueError: Members must be a list!
RuntimeError: No ensemble members found
RuntimeError: cfunits not loaded!
ValueError: Axis is not defined!")
Returns:
np.array: 5D array with values
"""
var = NetCDFFileVariable(self.file, var_name)
if xcoords is not None or ycoords is not None:
raise NotImplementedError(
"Subsetting of the input dimensions not implemented yet!"
)
tinfo = ""
if times is not None:
tinfo = "time " + str(times) + " in "
logging.info("Reading %s variable %s from %s", tinfo, var.var_name, self.filename)
times_to_read = []
prev_time_steps = []
if times is None:
for i in range(0, var.times.shape[0]):
times_to_read.append(i)
if i > 0:
prev_time_steps.append(i - 1)
else:
prev_time_steps.append(0)
else:
if not isinstance(times, (list, tuple)):
raise ValueError("Times must be a list!")
if isdatetime(times[0]):
logging.debug("Time provided in call as datetime objects")
times_in_var = var.datetimes
for i, times_in_var_val in enumerate(times_in_var):
times_in_var_val = offsetaware(times_in_var_val)
logging.debug(
"i %s times_in_var %s times %s", i, times_in_var_val, times
)
for tval in times:
# Time steps requested
logging.debug(
"i=%s, times_in_var_val=%s tval=%s", i, times_in_var_val, tval
)
if times_in_var_val == tval:
times_to_read.append(i)
if i > 0:
prev_time_steps.append(i - 1)
else:
prev_time_steps.append(0)
else:
times_in_var = var.times
for i in range(0, times_in_var.shape[0]):
for times_val in times:
# Time steps requested
if i == times_val:
times_to_read.append(times_val)
if i > 0:
prev_time_steps.append(i - 1)
else:
prev_time_steps.append(0)
logging.debug("times to read %s", times_to_read)
levels_to_read = []
if levels is None:
for i in range(0, var.levels.shape[0]):
levels_to_read.append(i)
else:
logging.debug("Level provided in call. lev_from_ind=%s", str(lev_from_ind))
if not isinstance(levels, (list, tuple)):
raise ValueError("Levels must be a list!")
levels_in_var = var.levels
for i in range(0, levels_in_var.shape[0]):
for level_ind in levels:
if lev_from_ind:
if i == level_ind:
levels_to_read.append(i)
else:
# NB! Round number to avoid round off when matching
if round(float(levels_in_var[i]), 5) == round(
float(level_ind), 5
):
levels_to_read.append(i)
if len(levels_to_read) == 0:
levels_to_read = [0]
members_to_read = []
if members is None:
for i in range(0, var.members.shape[0]):
members_to_read.append(i)
else:
if not isinstance(members, (list, tuple)):
raise ValueError("Members must be a list!")
logging.debug("Ensemble members provided in call")
members_in_var = var.members
for i in range(0, members_in_var.shape[0]):
for member_val in members:
if members_in_var[i] == member_val:
members_to_read.append(i)
if len(members_to_read) == 0:
raise RuntimeError("No ensemble members found for " + var.var_name)
lons = var.lons
lats = var.lats
# Dimensions of the "problem"
dim_x = lons.shape[0]
dim_y = lats.shape[1]
logging.debug("lons.shape=%s lats.shape=%s", lons.shape, lats.shape)
geo = Geo(lons, lats)
dim_t = max(len(times_to_read), 1)
dim_levels = max(len(levels_to_read), 1)
dim_members = max(len(members_to_read), 1)
logging.debug("Dimensions in output")
logging.debug(
"%s %s %s %s %s",
str(dim_x),
str(dim_y),
str(dim_t),
str(dim_levels),
str(dim_members),
)
lon_ind = slice(0, dim_x, 1)
lat_ind = slice(0, dim_y, 1)
dims = []
prev_dims = []
types = var.axis_types
mapping = {} # Map axis to output axis
for i, type_val in enumerate(types):
if type_val == Axis.GEOX or type_val == Axis.LON:
dims.append(lon_ind)
prev_dims.append(lon_ind)
mapping[0] = i
elif type_val == Axis.GEOY or type_val == Axis.LAT:
dims.append(lat_ind)
prev_dims.append(lat_ind)
mapping[1] = i
elif type_val == Axis.TIME:
dims.append(times_to_read)
prev_dims.append(prev_time_steps)
mapping[2] = i
elif var.is_level(type_val):
dims.append(levels_to_read)
prev_dims.append(levels_to_read)
mapping[3] = i
elif type_val == Axis.REALIZATION:
dims.append(members_to_read)
prev_dims.append(members_to_read)
mapping[4] = i
else:
raise ValueError(str(types[i]) + " is not defined!")
logging.debug("Read %s with dimensions: %s", var.var_name, str(dims))
if deaccumulate:
logging.debug("Deaccumulate previous dimensions: %s", str(prev_dims))
logging.debug("var.var_name: %s", var.var_name)
logging.debug("dims %s", dims)
logging.debug("self.file[var.var_name] %s", self.file[var.var_name])
field = self.file[var.var_name][dims]
if units is not None:
if cfunits is None:
raise RuntimeError("cfunits not loaded!")
field = cfunits.Units.conform(
field, cfunits.Units(var.units), cfunits.Units(units)
)
# Deaccumulation
if deaccumulate:
original_field = field
previous_field = self.file[var.var_name][prev_dims]
if units is not None:
if cfunits is None:
raise RuntimeError("cfunits not loaded!")
previous_field = cfunits.Units.conform(
previous_field, cfunits.Units(var.units), cfunits.Units(units)
)
field = np.subtract(original_field, previous_field)
# Create instantanious values
if instantanious > 0:
field = np.divide(field, instantanious)
# Add extra dimensions
i = 0
reverse_mapping = []
for dim in range(0, 5):
if dim not in mapping:
logging.debug("Adding dimension %s", str(dim))
field = np.expand_dims(field, len(dims) + i)
reverse_mapping.append(len(dims) + i)
i = i + 1
else:
reverse_mapping.append(mapping[dim])
# Transpose to 5D array
logging.debug("Transpose to 5D array")
field = np.transpose(field, reverse_mapping)
logging.debug("Read netcdf from %s times %s", self.filename, times)
logging.debug("Shape of output: %s", str(field.shape))
return field, geo
[docs] def field(self, var_name, level=None, member=None, validtime=None, units=None):
"""Read field.
Args:
var_name (str): Variable name
level (int, optional): Level. Defaults to None.
member (int, optional): Realization. Defaults to None.
validtime (surfex.datetime_utils.as_datetime, optional): Validtime. Defaults to None.
units (str, optional): Units. Defaults to None.
Returns:
tuple: Field, Geo
"""
if validtime is None:
validtime = []
else:
validtime = [validtime]
logging.debug("level %s member %s validtime %s", level, member, validtime)
field, geo_in = self.nc_slice(
var_name, levels=level, members=member, times=validtime, units=units
)
# Reshape to fortran 2D style
field = np.reshape(field, [geo_in.nlons, geo_in.nlats], order="F")
return field, geo_in
[docs] def points(self, var, geo, validtime=None, interpolation="bilinear"):
"""Read a field and interpolate it to requested positions.
Args:
var (NetCDFReadVariable): NetCDF variable
geo (surfx.geo.Geo): Geometry
validtime (surfex.datetime_utils.as_datetime, optional): Validtime. Defaults to None.
interpolation (str, optional): Interpolation. Defaults to "bilinear".
Returns:
tuple: Field, Interpolator
"""
var_name = var.name
level = var.level
member = var.member
units = var.units
logging.debug("level %s member %s validtime %s", level, member, validtime)
field, geo_in = self.field(
var_name, level=level, member=member, validtime=validtime, units=units
)
interpolator = Interpolation(interpolation, geo_in, geo)
field = interpolator.interpolate(field)
return field, interpolator
[docs]class Axis(Enum):
"""Axis."""
UNDEFINED = 0
GEOX = 1
GEOY = 2
GEOZ = 3
TIME = 4
LON = 5
LAT = 6
PESSURE = 7
HEIGHT = 8
REFERENCETIME = 9
REALIZATION = 10
HYBRID = 11
[docs]class NetCDFReadVariable(object):
"""NetCDF read variable."""
[docs] def __init__(self, name, level=None, units=None, member=None):
"""Construct object.
Args:
name (_type_): _description_
level (_type_, optional): _description_. Defaults to None.
units (_type_, optional): _description_. Defaults to None.
member (_type_, optional): _description_. Defaults to None.
"""
self.name = name
self.level = level
self.units = units
self.member = member
[docs]class NetCDFFileVariable(object):
"""NetDF file variable."""
[docs] def __init__(self, file_handler, var):
"""Construct object.
Args:
file_handler (_type_): _description_
var (_type_): _description_
"""
self.file = file_handler
if isinstance(var, str):
self.var_name = var
else:
self.var_name = var.name
@property
def axis_types(self):
"""Get axis_types."""
types = []
if self.var_name not in self.file.variables:
raise RuntimeError(self.var_name + " is missing in file!")
if self.file.variables[self.var_name]:
for dim_name in self.file.variables[self.var_name].dimensions:
if dim_name == "longitude" or dim_name == "lon":
types.append(Axis.LON)
elif dim_name == "x":
types.append(Axis.GEOX)
elif dim_name == "latitude" or dim_name == "lat":
types.append(Axis.LAT)
elif dim_name == "y":
types.append(Axis.GEOY)
elif re.search("height[0-9]*", dim_name):
types.append(Axis.HEIGHT)
elif re.search("hybrid[0-9]*", dim_name):
types.append(Axis.HYBRID)
elif re.search("pressure[0-9]*", dim_name):
types.append(Axis.PESSURE)
# TODO: GeoZ
elif dim_name == "ensemble_member":
types.append(Axis.REALIZATION)
elif dim_name == "time":
types.append(Axis.TIME)
else:
types.append(Axis.UNDEFINED)
return types
@property
def dim_names(self):
"""Get dim_names."""
names = []
if self.file.variables[self.var_name]:
for dim_name in self.file.variables[self.var_name].dimensions:
names.append(dim_name)
return names
@property
def units(self):
"""Get units."""
units = None
if self.file.variables[self.var_name].units:
units = self.file.variables[self.var_name].units
return units
@property
def lats(self):
"""Get lats.
Raises:
RuntimeError: No latitude found
Returns:
np.array: 2D array of latitudes
"""
latvals = np.array([])
axis_types = self.axis_types
for i, axis_type in enumerate(axis_types):
if axis_type == Axis.LAT:
latvals = self.file.variables[self.dim_names[i]]
logging.warning("Assumed to 2D in (lon,lat) order")
elif axis_type == Axis.GEOY:
# TODO: if lat/lon are 1D, create a 2D mesh
# TODO: Assume the name for now. Must be found in attributes
latvals = self.file.variables["latitude"]
latvals = np.transpose(latvals, (1, 0))
if latvals.shape[0] == 0:
raise RuntimeError("No latitude found for " + self.var_name)
return latvals
@property
def lons(self):
"""Get lons.
Raises:
RuntimeError: No longitude found
Returns:
np.array: 2D array of longitudes
"""
lonvals = np.array([])
axis_types = self.axis_types
for i, axis_type in enumerate(axis_types):
if axis_type == Axis.LON:
lonvals = self.file.variables[self.dim_names[i]]
elif axis_type == Axis.GEOX:
# TODO: if lat/lon are 1D, create a 2D mesh
# TODO: Assume the name for now. Must be found in attributes
lonvals = self.file.variables["longitude"]
lonvals = np.transpose(lonvals, (1, 0))
if lonvals.shape[0] == 0:
raise RuntimeError("No longitude found for " + self.var_name)
return lonvals
@property
def datetimes(self):
"""Get datetimes.
Raises:
RuntimeError: cfunits not loaded!
Returns:
list()
"""
times = []
axis_types = self.axis_types
for i, axis_type in enumerate(axis_types):
if axis_type == Axis.TIME:
val = self.file.variables[self.dim_names[i]]
for tval in val:
if cfunits is None:
raise RuntimeError("cfunits not loaded!")
epochtime = int(
cfunits.Units.conform(
tval,
cfunits.Units(val.units),
cfunits.Units("seconds since 1970-01-01 00:00:00"),
)
)
logging.debug("epoctime %s", epochtime)
d_t = utcfromtimestamp(epochtime)
logging.debug("dt %s", d_t)
times.append(d_t)
if len(times) == 0:
logging.debug("No time found for %s", self.var_name)
return times
@property
def times(self):
"""Get times.
Returns:
np.array: 1D array of times
"""
times = np.array([])
axis_types = self.axis_types
for i, axis_type in enumerate(axis_types):
if axis_type == Axis.TIME:
times = self.file.variables[self.dim_names[i]]
if times.shape[0] == 0:
logging.debug("No time found for %s", self.var_name)
return times
@property
def members(self):
"""Get members.
Returns:
np.array: 1D array of ensemble members
"""
members = np.array([])
axis_types = self.axis_types
for i, axis_type in enumerate(axis_types):
if axis_type == Axis.REALIZATION:
members = self.file.variables[self.dim_names[i]]
if members.shape[0] == 0:
logging.debug("No ensemble members found for %s", self.var_name)
return members
@property
def levels(self):
"""Get levels.
Returns:
np.array: 1D array of levels
"""
levels = np.array([])
axis_types = self.axis_types
for i, axis_type in enumerate(axis_types):
if self.is_level(axis_type):
levels = self.file.variables[self.dim_names[i]]
if levels.shape[0] == 0:
logging.debug("No levels found for %s", self.var_name)
return levels
[docs] @staticmethod
def is_level(axis_type):
"""Check if is level.
Args:
axis_type (Axis): Acis type
Returns:
bool: If axis is a level type
"""
if (
axis_type == Axis.HEIGHT
or axis_type == Axis.PESSURE
or axis_type == Axis.GEOZ
or axis_type == Axis.HYBRID
):
return True
else:
return False
def create_netcdf_first_guess_template(
my_variables, my_nx, my_ny, fname="raw.nc", geo=None
):
"""Create netCDF template file for first guess.
Args:
my_variables (_type_): _description_
my_nx (_type_): _description_
my_ny (_type_): _description_
fname (str, optional): _description_. Defaults to "raw.nc".
geo (_type_, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
if os.path.exists(fname):
os.remove(fname)
my_fg = netCDF4.Dataset(fname, "w")
my_fg.createDimension("y", my_ny)
my_fg.createDimension("x", my_nx)
my_fg.createDimension("time", 1)
my_fg.createVariable("time", "f8", "time")
my_fg.variables["time"].long_name = "time"
my_fg.variables["time"].standard_name = "time"
my_fg.variables["time"].units = "seconds since 1970-01-01 00:00:00 +00:00"
my_fg.createVariable("longitude", "f8", ("y", "x"))
my_fg.variables["longitude"].units = "degree_east"
my_fg.variables["longitude"].long_name = "longitude"
my_fg.variables["longitude"].standard_name = "longitude"
my_fg.createVariable("latitude", "f8", ("y", "x"))
my_fg.variables["latitude"].units = "degree_north"
my_fg.variables["latitude"].long_name = "latitude"
my_fg.variables["latitude"].standard_name = "latitude"
my_fg.createVariable("x", "f4", "x")
my_fg.variables["x"].long_name = "x-coordinate in Cartesian system"
my_fg.variables["x"].standard_name = "projection_x_coordinate"
my_fg.variables["x"].units = "m"
my_fg.createVariable("y", "f4", "y")
my_fg.variables["y"].long_name = "y-coordinate in Cartesian system"
my_fg.variables["y"].standard_name = "projection_y_coordinate"
my_fg.variables["y"].units = "m"
standard_name = {
"air_temperature_2m": "air_temperature",
"relative_humidity_2m": "relative_humidity",
"altitude": "altitude",
"surface_snow_thickness": "surface_snow_thickness",
"sea_ice_thickness": "sea_ice_thickness",
"surface_soil_moisture": "surface_soil_moisture",
"cloud_base": "cloud_base",
"land_area_fraction": "land_area_fraction",
}
long_name = {
"air_temperature_2m": "Screen level temperature (T2M)",
"relative_humidity_2m": "Screen level relative humidity (RH2M)",
"altitude": "Altitude",
"surface_snow_thickness": "Surface snow thickness",
"sea_ice_thickness": "Sea ice thickness",
"surface_soil_moisture": "Surface soil moisture",
"cloud_base": "Cloud base",
"land_area_fraction": "Land Area Fraction",
}
units = {
"air_temperature_2m": "K",
"relative_humidity_2m": "1",
"altitude": "m",
"surface_snow_thickness": "m",
"sea_ice_thickness": "m",
"surface_soil_moisture": "m3/m3",
"cloud_base": "m",
"land_area_fraction": "1",
}
fillvalue = {
"air_temperature_2m": "9.96921e+36",
"relative_humidity_2m": "9.96921e+36",
"altitude": "9.96921e+36",
"surface_snow_thickness": "9.96921e+36",
"sea_ice_thickness": "9.96921e+36",
"surface_soil_moisture": "9.96921e+36",
"cloud_base": "9.96921e+36",
"land_area_fraction": "9.96921e+36",
}
for my_var in my_variables:
my_fg.createVariable(my_var, "f4", ("y", "x"), fill_value=fillvalue[my_var])
my_fg.variables[my_var].long_name = long_name[my_var]
my_fg.variables[my_var].standard_name = standard_name[my_var]
my_fg.variables[my_var].units = units[my_var]
# Global attributes
if geo is not None:
if isinstance(geo, ConfProj):
my_fg.setncattr("gridtype", "lambert")
my_fg.setncattr("dlon", float(geo.xdx))
my_fg.setncattr("dlat", float(geo.xdy))
my_fg.setncattr("projlat", float(geo.xlat0))
my_fg.setncattr("projlat2", float(geo.xlat0))
my_fg.setncattr("projlon", float(geo.xlon0))
my_fg.setncattr("lonc", float(geo.xloncen))
my_fg.setncattr("latc", float(geo.xlatcen))
return my_fg
[docs]def read_first_guess_netcdf_file(input_file, var):
"""Read netCDF first guess file.
Args:
input_file (_type_): _description_
var (_type_): _description_
Raises:
NotImplementedError: Only conf proj implemented when reading geo from file
RuntimeError: cfunits not loaded!
Returns:
tuple: geo, validtime, background, glafs, gelevs
"""
file_handler = netCDF4.Dataset(input_file)
lons = file_handler["longitude"][:]
lats = file_handler["latitude"][:]
if cfunits is None:
raise RuntimeError("cfunits not loaded!")
validtime = int(
cfunits.Units.conform(
file_handler["time"][:],
cfunits.Units(file_handler["time"].units),
cfunits.Units("seconds since 1970-01-01 00:00:00"),
)
)
validtime = fromtimestamp(validtime)
n_x = lons.shape[1]
n_y = lons.shape[0]
attrs = file_handler.ncattrs()
if "gridtype" in attrs:
if file_handler.getncattr("gridtype") == "lambert":
from_json = {
"nam_conf_proj_grid": {
"nimax": n_x,
"njmax": n_y,
"xloncen": file_handler.getncattr("lonc"),
"xlatcen": file_handler.getncattr("latc"),
"xdx": file_handler.getncattr("dlon"),
"xdy": file_handler.getncattr("dlat"),
},
"nam_conf_proj": {
"xlon0": file_handler.getncattr("projlon"),
"xlat0": file_handler.getncattr("projlat"),
},
}
geo = ConfProj(from_json)
else:
raise NotImplementedError
else:
lons = np.array(np.transpose(np.reshape(lons, [n_y, n_x], order="F")))
lats = np.array(np.transpose(np.reshape(lats, [n_y, n_x], order="F")))
geo = Geo(lons, lats)
background = file_handler[var][:]
background = np.array(np.reshape(background, [n_x * n_y]))
background = np.reshape(background, [n_y, n_x])
background = np.transpose(background)
fill_value = file_handler.variables[var].getncattr("_FillValue")
logging.info("Field %s got %s. Fill with nan", var, str(fill_value))
background[background == fill_value] = np.nan
glafs = file_handler["land_area_fraction"][:]
glafs = np.array(np.reshape(glafs, [n_x * n_y]))
glafs = np.reshape(glafs, [n_y, n_x])
glafs = np.transpose(glafs)
gelevs = file_handler["altitude"][:]
gelevs = np.array(np.reshape(gelevs, [n_x * n_y]))
gelevs = np.reshape(gelevs, [n_y, n_x])
gelevs = np.transpose(gelevs)
file_handler.close()
return geo, validtime, background, glafs, gelevs
def write_analysis_netcdf_file(
filename, field, var, validtime, elevs, lafs, new_file=True, geo=None
):
"""Write analysis NetCDF file.
Args:
filename (_type_): _description_
field (_type_): _description_
var (_type_): _description_
validtime (_type_): _description_
elevs (_type_): _description_
lafs (_type_): _description_
new_file (bool, optional): _description_. Defaults to True.
geo (_type_, optional): _description_. Defaults to None.
Raises:
Exception: _description_
"""
file_handler = None
if os.path.exists(filename):
file_handler = netCDF4.Dataset(filename, mode="a")
else:
new_file = True
if new_file:
if geo is None:
raise Exception("You need to provide geo to write a new file")
file_handler = create_netcdf_first_guess_template(
[var, "altitude", "land_area_fraction"],
geo.nlons,
geo.nlats,
fname=filename,
geo=geo,
)
file_handler.variables["longitude"][:] = np.transpose(geo.lons)
file_handler.variables["latitude"][:] = np.transpose(geo.lats)
file_handler.variables["x"][:] = list(range(0, geo.nlons))
file_handler.variables["y"][:] = list(range(0, geo.nlats))
file_handler.variables["altitude"][:] = np.transpose(elevs)
file_handler.variables["land_area_fraction"][:] = np.transpose(lafs)
file_handler["time"][:] = float(validtime.strftime("%s"))
fill_value = file_handler.variables[var].getncattr("_FillValue")
logging.info("Field %s got nan. Fill with %s", var, str(fill_value))
field[np.where(np.isnan(field))] = fill_value
file_handler[var][:] = np.transpose(field)
file_handler.close()
[docs]def oi2soda(dtg, t2m=None, rh2m=None, s_d=None, s_m=None, output=None):
"""Convert analysis to ASCII obs file for SODA.
Args:
dtg (surfex.datetime_utils): Analysis time
t2m (dict, optional): Screen level temperature var and file name. Defaults to None.
rh2m (dict, optional): Screen level relative humidiy var and file name. Defaults to None.
s_d (dict, optional): Snow depth var and file name. Defaults to None.
s_m (dict, optional): Soil moisture var and file name. Defaults to None.
output (str, optional): Output file name. Defaults to None.
Raises:
RuntimeError: You must specify at least one file to read from
RuntimeError: Mismatch in ?? dimension
"""
def check_input_to_soda_dimensions(my_nx, my_ny, nx1, ny1):
if my_nx < 0:
my_nx = nx1
if my_ny < 0:
my_ny = ny1
if my_nx != nx1:
raise RuntimeError(
"Mismatch in nx dimension " + str(my_nx) + " != " + str(nx1)
)
if my_ny != ny1:
raise RuntimeError(
"Mismatch in ny dimension " + str(my_ny) + " != " + str(ny1)
)
return my_nx, my_ny
cyy = dtg.strftime("%y")
cmm = dtg.strftime("%m")
cdd = dtg.strftime("%d")
chh = dtg.strftime("%H")
n_x = -1
n_y = -1
i = 0
t2m_var = None
rh2m_var = None
sd_var = None
sm_var = None
if t2m is not None:
t2m_fh = netCDF4.Dataset(t2m["file"], "r")
logging.debug("T2m %s %s", t2m["var"], t2m_fh.variables[t2m["var"]].shape)
i = i + 1
n_x, n_y = check_input_to_soda_dimensions(
n_x,
n_y,
t2m_fh.variables[t2m["var"]].shape[1],
t2m_fh.variables[t2m["var"]].shape[0],
)
t2m_var = t2m_fh.variables[t2m["var"]][:]
logging.debug("%s %s", t2m_var.shape, n_x * n_y)
t2m_var = t2m_var.reshape([n_y * n_x], order="C")
t2m_var = t2m_var.filled(fill_value=999.0)
t2m_var = t2m_var.tolist()
if rh2m is not None:
rh2m_fh = netCDF4.Dataset(rh2m["file"], "r")
logging.debug("RH2m %s %s", rh2m["var"], rh2m_fh.variables[rh2m["var"]].shape)
i = i + 1
n_x, n_y = check_input_to_soda_dimensions(
n_x,
n_y,
rh2m_fh.variables[rh2m["var"]].shape[1],
rh2m_fh.variables[rh2m["var"]].shape[0],
)
rh2m_var = rh2m_fh.variables[rh2m["var"]][:]
rh2m_var = rh2m_var.reshape([n_y * n_x], order="C")
rh2m_var = rh2m_var.filled(fill_value=999.0)
rh2m_var = rh2m_var.tolist()
if s_d is not None:
sd_fh = netCDF4.Dataset(s_d["file"], "r")
logging.debug("SD %s %s", s_d["var"], sd_fh.variables[s_d["var"]].shape)
i = i + 1
n_x, n_y = check_input_to_soda_dimensions(
n_x,
n_y,
sd_fh.variables[s_d["var"]].shape[1],
sd_fh.variables[s_d["var"]].shape[0],
)
sd_var = sd_fh.variables[s_d["var"]][:]
sd_var = sd_var.reshape([n_y * n_x], order="C")
sd_var = sd_var.filled(fill_value=-999.0)
sd_var = sd_var.tolist()
if s_m is not None:
sm_fh = netCDF4.Dataset(s_m["file"], "r")
logging.debug("SM %s %s", s_m["var"], sm_fh.variables[s_m["var"]].shape)
i = i + 1
n_x, n_y = check_input_to_soda_dimensions(
n_x,
n_y,
sm_fh.variables[s_m["var"]].shape[1],
sm_fh.variables[s_m["var"]].shape[0],
)
sm_var = sm_fh.variables[s_m["var"]][:]
sm_var = sm_var.reshape([n_y * n_x], order="C")
sm_var = sm_var.filled(fill_value=999.0)
sm_var = sm_var.tolist()
if i == 0:
raise RuntimeError("You must specify at least one file to read from!")
if output is None:
output = (
"OBSERVATIONS_" + str(cyy) + str(cmm) + str(cdd) + "H" + str(chh) + ".DAT"
)
with open(output, mode="w", encoding="utf-8") as out:
for i in range(0, n_x * n_y):
line = ""
if t2m_var is not None:
line = line + " " + str(t2m_var[i])
if rh2m_var is not None:
line = line + " " + str(rh2m_var[i])
if sd_var is not None:
line = line + " " + str(sd_var[i])
if sm_var is not None:
line = line + " " + str(sm_var[i])
line = line + "\n"
out.write(line)
logging.debug("i %s", i)
[docs]def read_cryoclim_nc(infiles, cryo_varname="classed_value_c"):
"""Read crycoclim netCDF file.
Args:
infiles (list): Input files.
cryo_varname (str, optional): Variable name in cryo file. Defaults to "classed_value_c"
Raises:
RuntimeError: "No files were read properly"
Returns:
tuple: grid_lons, grid_lats, grid_snow_class
"""
grid_lons = None
grid_lats = None
grid_snow_class = None
for filename in infiles:
if os.path.exists(filename):
logging.info("Reading: %s", filename)
ncf = netCDF4.Dataset(filename, "r")
grid_lons = ncf["lon"][:]
grid_lats = ncf["lat"][:]
grid_snow_class_read = ncf[cryo_varname][:]
if grid_snow_class is None:
grid_snow_class = grid_snow_class_read
ncf.close()
else:
logging.warning("Warning file %s does not exists", filename)
if grid_lons is None or grid_lats is None or grid_snow_class is None:
raise RuntimeError("No files were read properly")
return grid_lons, grid_lats, grid_snow_class
[docs]def read_sentinel_nc(infiles):
"""Read sentinel nc files.
Args:
infiles (list): Input files.
Raises:
RuntimeError: "No files were read properly"
Returns:
tuple: longitudes, latitudes, soil moisture
"""
grid_lons = None
grid_lats = None
grid_sm = None
for filename in infiles:
if os.path.exists(filename):
logging.info("Reading: %s", filename)
nch = netCDF4.Dataset(filename, "r")
grid_lons = nch["LON"][:]
grid_lats = nch["LAT"][:]
grid_sm = nch["surface_soil_moisture"][:]
nch.close()
else:
logging.warning("Warning file %s does not exists", filename)
if grid_lons is None or grid_lats is None or grid_sm is None:
raise RuntimeError("No files were read properly")
return grid_lons, grid_lats, grid_sm