"""Surfex file related stuff."""
import abc
import logging
import os
import re
import shutil
import netCDF4
import numpy as np
import pyproj
from .datetime_utils import as_datetime, as_datetime_args, as_timedelta
from .fa import Fa
from .geo import IGN, ConfProj, LonLatReg, LonLatVal
from .interpolation import Interpolation
from .util import remove_existing_file
[docs]class SurfexIO(object):
"""Abstract Surfex IO class.
Used for internal surfex file format files.
"""
[docs] def __init__(self, filename, geo, extension):
"""Construct a surfex IO object.
Args:
filename (str): Name of file
geo (surfex.Geometry): Geometry
extension (str): File extension
"""
self.filename = filename
self.geo = geo
self.extension = extension
[docs] @abc.abstractmethod
def field(self, var, validtime=None):
"""Abstract method to read field.
Args:
var (SurfexFileVariable): Variable in surfex file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
Raises:
NotImplementedError: Must be implemented by child class.
"""
raise NotImplementedError("This method is not implemented for this class!")
[docs] @abc.abstractmethod
def points(self, var, geo_out, validtime=None, interpolation="bilinear"):
"""Abstract method to read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geometry): Surfex geometry to interpolate to.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type.. Defaults to "nearest".
Raises:
NotImplementedError: Must be implemented by child class.
"""
raise NotImplementedError("This method is not implemented for this class!")
[docs] @staticmethod
def interpolate_field(field, geo_in, geo_out, interpolation="bilinear"):
"""Interpolate a field to points.
Args:
field (np.darray): Field to interpolate
geo_in (surfex.Geo): Input geometry
geo_out (surfex.Geo): Target geometry
interpolation (str, optional): Interpolation type. Defaults to "bilinear".
Returns:
tuple: (np.array, surfex.Interpolator)
"""
interpolator = Interpolation(interpolation, geo_in, geo_out)
field = interpolator.interpolate(field)
return field, interpolator
[docs]class SurfexSurfIO(object):
"""Surfex surf file.
Class for surfex restart files.
"""
def __init__(
self, surfexfile, csurf_filetype, input_file=None, symlink=True, archive_file=None
):
"""Construct the surfex surf file.
Args:
surfexfile (str): Internal file name of surfex file.
csurf_filetype (str): File type
input_file (str, optional): File name to be used as surfexfile. Defaults to None.
symlink (bool, optional): Symlink input_file to surfexfile. Defaults to True.
archive_file (str, optional): Location to store the result. Defaults to None.
"""
self.filename = surfexfile.filename
self.csurf_filetype = csurf_filetype
self.need_pgd = False
logging.debug("Input file: %s", input_file)
self.input_file = input_file
self.symlink = symlink
if self.input_file is not None:
if self.symlink:
self.symlink_input()
else:
self.copy_input()
self.archive_file = archive_file
[docs] def archive_output_file(self):
"""Archive the output file."""
if self.archive_file is not None:
dirname = os.path.dirname(os.path.abspath(self.archive_file))
os.makedirs(dirname, exist_ok=True)
f_in = os.getcwd() + "/" + self.filename
if os.path.abspath(self.archive_file) != f_in:
logging.info("Move %s to %s", f_in, self.archive_file)
if os.path.islink(self.archive_file):
os.unlink(self.archive_file)
if os.path.isfile(self.archive_file):
os.remove(self.archive_file)
if os.path.isdir(self.archive_file):
shutil.rmtree(self.archive_file)
shutil.move(f_in, self.archive_file)
[docs]class PGDFile(SurfexSurfIO):
"""PGD file."""
def __init__(
self,
csurf_filetype,
cpgdfile,
input_file=None,
symlink=True,
archive_file=None,
lfagmap=False,
masterodb=False,
):
"""Construct PGD file object.
Args:
csurf_filetype (str): File type
cpgdfile (str): Name of the PGD file
input_file (str, optional): Input file. Defaults to None.
symlink (bool, optional): Symlink input_file to surfexfile. Defaults to True.
archive_file (str, optional): Location to store the result. Defaults to None.
lfagmap (bool, optional): File use LFAGMAP. Defaults to False.
masterodb (bool, optional): File produced by masterodb. Defaults to False.
"""
logging.debug("PGDFile")
logging.debug("%s %s %s", cpgdfile, csurf_filetype, masterodb)
cpgdfile = get_surfex_io_object(
cpgdfile,
filetype="surf",
fileformat=csurf_filetype,
lfagmap=lfagmap,
masterodb=masterodb,
)
SurfexSurfIO.__init__(
self,
cpgdfile,
csurf_filetype,
input_file=input_file,
archive_file=archive_file,
symlink=symlink,
)
self.need_pgd = False
[docs]class PREPFile(SurfexSurfIO):
"""PREP file."""
def __init__(
self,
csurf_filetype,
cprepfile,
input_file=None,
symlink=True,
archive_file=None,
lfagmap=False,
masterodb=False,
):
"""Construct PREP file object.
Args:
csurf_filetype (str): File type
cprepfile (str): Name of the PREP file
input_file (str, optional): Input file. Defaults to None.
symlink (bool, optional): Symlink input_file to surfexfile. Defaults to True.
archive_file (str, optional): Location to store the result. Defaults to None.
lfagmap (bool, optional): File use LFAGMAP. Defaults to False.
masterodb (bool, optional): File produced by masterodb. Defaults to False.
"""
logging.debug("PREPFile %s", input_file)
cprepfile = get_surfex_io_object(
cprepfile,
filetype="surf",
fileformat=csurf_filetype,
lfagmap=lfagmap,
masterodb=masterodb,
)
SurfexSurfIO.__init__(
self,
cprepfile,
csurf_filetype,
input_file=input_file,
archive_file=archive_file,
symlink=symlink,
)
self.need_pgd = True
[docs]class SURFFile(SurfexSurfIO):
"""SURFOUT file."""
def __init__(
self,
csurf_filetype,
csurffile,
archive_file=None,
input_file=None,
lfagmap=False,
masterodb=False,
):
"""Construct SURFOUT file object.
Result of a surfex binary.
Args:
csurf_filetype (str): File type
csurffile (str): Name of the PREP file
input_file (str, optional): Input file. Defaults to None.
archive_file (str, optional): Location to store the result. Defaults to None.
lfagmap (bool, optional): File use LFAGMAP. Defaults to False.
masterodb (bool, optional): File produced by masterodb. Defaults to False.
"""
logging.debug("SURFFile")
csurffile = get_surfex_io_object(
csurffile,
filetype="surf",
fileformat=csurf_filetype,
lfagmap=lfagmap,
masterodb=masterodb,
)
SurfexSurfIO.__init__(
self,
csurffile,
csurf_filetype,
input_file=input_file,
archive_file=archive_file,
)
self.need_pgd = True
[docs]class SurfexFileVariable(object):
"""Surfex Variable."""
def __init__(
self,
varname,
validtime=None,
patches=1,
layers=1,
basetime=None,
interval=None,
datatype="float",
tiletype="FULL",
):
"""Construct a surfex file variable.
Args:
varname (str): Variable name.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
patches (int, optional): Number of patches. Defaults to 1.
layers (int, optional): Number of layers. Defaults to 1.
basetime (datetime.datetime, optional): _description_. Defaults to None.
interval (int, optional): Interval. Defaults to None.
datatype (str, optional): Data type for variable. Defaults to "float".
tiletype (str, optional): Tiletype
"""
self.varname = varname
self.type = datatype
self.patches = patches
self.layers = layers
self.basetime = basetime
self.interval = interval
self.validtime = validtime
self.datatype = datatype
self.tiletype = tiletype
[docs] def print_var(self):
"""Print variable information."""
return self.varname
def get_surfex_io_object(
fname, filetype="surf", fileformat=None, geo=None, lfagmap=False, masterodb=False
):
"""Get the surfexIO object.
Args:
fname (str): File name.
filetype (str, optional): File type. Defaults to "surf".
fileformat (str, optional): File format. Defaults to None.
geo (surfex.Geo, optional): Geometry. Defaults to None.
lfagmap (bool, optional): File use LFAGMAP. Defaults to False.
masterodb (bool, optional): File produced by masterodb. Defaults to False.
Raises:
RuntimeError: Invalid filetype
NotImplementedError: Filetype not implemented
RuntimeError: Format needs a geometry
Returns:
SurfexIO: SurfexIO object.
"""
logging.debug("get_surfex_io_object")
if filetype is not None:
if (
filetype.lower() != "surf"
and filetype.lower() != "ts"
and filetype.lower() != "forcing"
):
raise RuntimeError(
"Invalid filetype: " + filetype + " Allowed: surf/ts/forcing"
)
if fileformat is None:
fileformat, filetype = guess_file_format(fname, filetype)
if fileformat.lower() == "ascii":
if filetype.lower() == "surf":
obj = AsciiSurfexFile(fname, geo=geo)
elif filetype.lower() == "forcing":
raise NotImplementedError("Not implemented yet")
else:
raise NotImplementedError
elif fileformat.lower() == "nc":
if filetype.lower() == "surf":
obj = NCSurfexFile(fname, geo=geo)
else:
raise NotImplementedError
elif fileformat.lower() == "netcdf":
if filetype.lower() == "ts":
if geo is None:
raise RuntimeError("Format NetCDF needs a geometry")
obj = NetCDFSurfexFile(fname, geo)
elif filetype.lower() == "forcing":
if geo is None:
raise RuntimeError(
"Format NetCDF needs a geometry for reading forcing files"
)
obj = ForcingFileNetCDF(fname, geo)
else:
raise NotImplementedError
elif fileformat.lower() == "texte":
if geo is None:
raise RuntimeError("Format TEXTE needs a geometry")
obj = TexteSurfexFile(fname, geo)
elif fileformat.lower() == "fa":
if filetype.lower() == "surf":
obj = FaSurfexFile(fname, geo=geo, lfagmap=lfagmap, masterodb=masterodb)
else:
raise NotImplementedError
else:
raise NotImplementedError("Format not implemented: " + fileformat)
logging.debug("Returning object: %s", obj)
return obj
[docs]class AsciiSurfexFile(SurfexIO):
"""Input from an ASCII surfex file (.txt)."""
[docs] def __init__(self, filename, geo=None):
"""Construct the ASCII object.
Args:
filename (str): Filename
geo(surfex.geo.Geo, optional): Geometry, Defaults to None.
"""
suffix = SurfFileTypeExtension("ASCII").suffix
self.filename = filename
if geo is None:
geo = self.get_geo()
if not filename.endswith(suffix):
filename = filename + suffix
SurfexIO.__init__(self, filename, geo, "txt")
[docs] def get_geo(self):
"""Get the geometry object.
Raises:
FileNotFoundError: _description_
RuntimeError: No grid found
NotImplementedError: _description_
Returns:
surfex.Geometry: Surfex geometry
"""
if not os.path.isfile(self.filename):
raise FileNotFoundError("File does not exist: " + str(self.filename))
grid = self.read("GRID_TYPE", "FULL", "string")
if grid is None:
raise RuntimeError("No grid found")
if grid == "IGN":
domain = {
"nam_ign": {
"clambert": self.read("LAMBERT", "&FULL", "integer"),
"npoints": self.read("NPOINTS", "&FULL", "integer"),
"xx": self.read("XX", "&FULL", "float"),
"xy": self.read("XY", "&FULL", "float"),
"xdx": self.read("XDX", "&FULL", "float"),
"xdy": self.read("XY", "&FULL", "float"),
"xx_llcorner": self.read("XX_LLCORNER", "&FULL", "float"),
"xy_llcorner": self.read("XY_LLCORNER", "&FULL", "float"),
"xcellsize": self.read("XCELLSIZE", "&FULL", "float"),
"ncols": self.read("NCOLS", "&FULL", "integer"),
"nrows": self.read("NROWS", "&FULL", "integer"),
}
}
return IGN(domain)
elif grid == "LONLATVAL":
domain = {
"nam_lonlatval": {
"xx": self.read("XX", "&FULL", "float"),
"xy": self.read("XY", "&FULL", "float"),
"xdx": self.read("DX", "&FULL", "float"),
"xdy": self.read("DY", "&FULL", "float"),
}
}
return LonLatVal(domain)
elif grid == "LONLAT REG":
domain = {
"nam_lonlat_reg": {
"xlonmin": self.read("LONMIN", "&FULL", "float"),
"xlatmin": self.read("LATMIN", "&FULL", "float"),
"xlonmax": self.read("LONMAX", "&FULL", "float"),
"xlatmax": self.read("LATMAX", "&FULL", "float"),
"nlon": self.read("NLON", "&FULL", "integer"),
"nlat": self.read("NLAT", "&FULL", "integer"),
}
}
return LonLatReg(domain)
elif grid == "CONF PROJ":
lon0 = self.read("LON0", "&FULL", "float")
lat0 = self.read("LAT0", "&FULL", "float")
n_x = self.read("IMAX", "&FULL", "integer")
n_y = self.read("JMAX", "&FULL", "integer")
d_x = self.read("XX", "&FULL", "float")
d_y = self.read("XX", "&FULL", "float")
if d_x.shape[0] > 1:
d_x = d_x[1] - d_x[0]
if d_y.shape[0] > 1:
d_y = d_y[1] - d_y[0]
ll_lon = self.read("LONORI", "&FULL", "float")
ll_lat = self.read("LATORI", "&FULL", "float")
logging.info(
"lon0=%s lat0=%s n_x=%s, n_y=%s d_x=%s dy=%s, ll_lon=%s, ll_lat=%s",
lon0,
lat0,
n_x,
n_y,
d_x,
d_y,
ll_lon,
ll_lat,
)
earth = 6.37122e6
proj_string = (
f"+proj=lcc +lat_0={str(lat0)} +lon_0={str(lon0)} +lat_1={str(lat0)} "
f"+lat_2={str(lat0)} +units=m +no_defs +R={str(earth)}"
)
proj = pyproj.CRS.from_string(proj_string)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
x_0, y_0 = pyproj.Transformer.from_crs(wgs84, proj, always_xy=True).transform(
ll_lon, ll_lat
)
x_c = x_0 + 0.5 * (n_x - 1) * d_x
y_c = y_0 + 0.5 * (n_y - 1) * d_y
lonc, latc = pyproj.Transformer.from_crs(
proj, wgs84, always_xy=True
).transform(x_c, y_c)
domain = {
"nam_conf_proj": {"xlon0": lon0, "xlat0": lat0},
"nam_conf_proj_grid": {
"xloncen": lonc,
"xlatcen": latc,
"nimax": n_x,
"njmax": n_y,
"xdx": d_x,
"xdy": d_x,
"ilone": 0,
"ilate": 0,
},
}
return ConfProj(domain)
else:
raise NotImplementedError("Grid " + str(grid[0]) + " not implemented!")
[docs] def read(self, read_par, read_tile, datatype):
"""Read the file.
Args:
read_par (str): Parameter to read
read_tile (str): Tile to read
datatype (str): Datatype
Raises:
NotImplementedError: Datatype not implemented
RuntimeError: Could not read datatype
Returns:
numpy.array: Values read
"""
# Add & if not given
if read_tile.find("&") < 0:
read_tile = "&" + read_tile
file = open(self.filename, mode="r", encoding="utf-8")
read_desc = False
read_value = False
values = []
for line in file:
words = line.split()
print(read_value, read_desc)
print(words)
if len(words) > 0:
if read_value and not read_desc:
if words[0].find("&") < 0:
try:
if datatype.lower() == "float":
for word in words:
val = float(word.replace("D", "E"))
if val == 1e20:
val = np.nan
values.append(val)
elif datatype.lower() == "string":
str_words = []
for word in words:
str_words.append(word)
values = " ".join(str_words)
elif (
datatype.lower() == "integer" or datatype.lower() == "int"
):
for word in words:
values.append(int(word))
elif (
datatype.lower() == "logical"
or datatype.lower() == "bool"
):
for word in words:
if word.lower().strip()[0] == "t":
values = True
else:
values = False
else:
raise NotImplementedError(
"Type not implemented " + str(datatype)
)
except ValueError:
raise RuntimeError(
f"Conversion from {str(words)} to {str(datatype)} "
"does not work! Try a different datatype!"
) from ValueError
if read_desc:
read_desc = False
read_value = True
if words[0].find("&") >= 0:
tile = words[0]
par = words[1]
read_value = False
if (
tile.strip().lower() == read_tile.lower()
and par.lower() == read_par.lower()
):
read_desc = True
read_value = False
logging.info("Found: %s %s", str(tile), str(par))
# Description could be empty
else:
if read_desc:
read_desc = False
read_value = True
if isinstance(values, list):
if len(values) == 0:
logging.info("No values found for %s", read_par)
return None
if len(values) > 1:
values = np.asarray(values)
else:
values = values[0]
logging.info("Returning values: %s", values)
return values
[docs] def field(self, var, validtime=None):
"""Read a field in file.
Args:
var (SurfexFileVariable): Variable in surfex file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
Returns:
np.darray: Field, surfex.Geo in read file
"""
read_par = var.varname
read_tile = var.tiletype
datatype = var.datatype
field = self.read(read_par, read_tile, datatype)
geo_in = self.get_geo()
field = np.reshape(field, [geo_in.nlons, geo_in.nlats], order="F")
return field, geo_in
[docs] def points(self, var, geo_out, validtime=None, interpolation="nearest"):
"""Read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geo): Surfex geometry for points.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type. Defaults to "nearest".
Returns:
np.darray: Interpolated points
"""
field, geo_in = self.field(var, validtime=validtime)
points, interpolator = SurfexIO.interpolate_field(
field, geo_in, geo_out, interpolation=interpolation
)
return points, interpolator
[docs]class NCSurfexFile(SurfexIO):
"""NetCDF surfex file (restart type)."""
[docs] def __init__(self, filename, geo=None):
"""Construct NC file object.
Args:
filename (str): File name
geo (surfex.Geo, optional): Surfex geometry. Defaults to None.
"""
suffix = SurfFileTypeExtension("NC").suffix
if not filename.endswith(suffix):
filename = filename + suffix
self.filename = filename
if geo is None:
geo = self.get_geo()
SurfexIO.__init__(self, filename, geo, "nc")
[docs] def get_geo(self):
"""Get geometry in file.
Raises:
NotImplementedError: Grid not implemented
Returns:
surfex.Geometry: Surfex geometry in file
"""
try:
f_h = netCDF4.Dataset(self.filename, "r")
except OSError:
logging.debug("Geo not open %s as a netCDF file", self.filename)
return None
cgrid = str(netCDF4.chartostring(f_h["GRID_TYPE"][:])).strip()
if cgrid == "CONF PROJ":
lon0 = float(f_h["LON0"][0])
lat0 = float(f_h["LAT0"][0])
n_x = int(f_h["IMAX"][0])
n_y = int(f_h["JMAX"][0])
d_x = float(f_h["DX"][0][0])
d_y = float(f_h["DY"][0][0])
ll_lon = float(f_h["LONORI"][0])
ll_lat = float(f_h["LATORI"][0])
earth = 6.37122e6
proj_string = (
f"+proj=lcc +lat_0={str(lat0)} +lon_0={str(lon0)} +lat_1={str(lat0)} "
f"+lat_2={str(lat0)} +units=m +no_defs +R={str(earth)}"
)
proj = pyproj.CRS.from_string(proj_string)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
x_0, y_0 = pyproj.Transformer.from_crs(wgs84, proj, always_xy=True).transform(
ll_lon, ll_lat
)
x_c = x_0 + 0.5 * (n_x + 1) * d_x
y_c = y_0 + 0.5 * (n_y + 1) * d_y
lonc, latc = pyproj.Transformer.from_crs(
proj, wgs84, always_xy=True
).transform(x_c, y_c)
domain = {
"nam_conf_proj": {"xlon0": lon0, "xlat0": lat0},
"nam_conf_proj_grid": {
"xloncen": lonc,
"xlatcen": latc,
"nimax": n_x,
"njmax": n_y,
"xdx": d_x,
"xdy": d_y,
"ilone": 0,
"ilate": 0,
},
}
return ConfProj(domain)
elif cgrid == "IGN":
domain = {
"nam_ign": {
"clambert": f_h["CLAMBERT"][0],
"xx": f_h["XX"][:],
"xy": f_h["XY"][:],
"xdx": f_h["DX"][:],
"xdy": f_h["DY"][:],
}
}
return IGN(domain)
elif cgrid == "LONLATVAL":
domain = {
"nam_lonlatval": {
"xx": f_h["XX"][:],
"xy": f_h["XY"][:],
"xdx": f_h["DX"][:],
"xdy": f_h["DY"][:],
}
}
return LonLatVal(domain)
elif cgrid == "LONLAT REG":
domain = {
"nam_lonlatval_reg": {
"lonmin": f_h["LONMIN"][0],
"latmin": f_h["LATMIN"][0],
"lonmax": f_h["LONMAX"][0],
"latmax": f_h["LATMAX"][0],
"nlon": f_h["NLON"][0],
"nlat": f_h["NLAT"][0],
"reg_lon": f_h["REG_LON"][0],
"reg_lat": f_h["REG_LAT"][0],
}
}
return LonLatReg(domain)
else:
raise NotImplementedError(cgrid + " is not implemented")
[docs] def field(self, var, validtime=None):
"""Read field.
Args:
var (SurfexFileVariable): Variable in surfex file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
validtime (_type_, optional): _description_. Defaults to None.
Raises:
RuntimeError: Mismatch in times in file and the wanted time
Returns:
np.darray: Field, surfex.Geo in read file
"""
f_h = netCDF4.Dataset(self.filename, "r")
if validtime is None:
pass
else:
time_in_file = None
try:
year = f_h["DTCUR-YEAR"][0]
month = f_h["DTCUR-MONTH"][0]
day = f_h["DTCUR-DAY"][0]
time = int(f_h["DTCUR-TIME"][0])
hour = int(time / 3600)
# TODO minutes
time_in_file = as_datetime_args(
year=year, month=month, day=day, hour=hour
)
except IndexError:
logging.warning("Could not checking time")
if time_in_file is not None:
if validtime != time_in_file:
logging.error(
"time_in_file=%s validtime=%s %s %s",
time_in_file,
validtime,
type(time_in_file),
type(validtime),
)
raise RuntimeError("Mismatch in times in file and the wanted time")
geo_in = self.get_geo()
field = f_h[var.varname][:]
fillvalue = f_h[var.varname].getncattr("_FillValue")
logging.info("Set %s to nan", fillvalue)
field = field.filled(np.nan)
# Reshape to fortran 2D style
field = np.transpose(field)
return field, geo_in
[docs] def points(self, var, geo_out, validtime=None, interpolation="nearest"):
"""Read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geo): Surfex geometry for points.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type. Defaults to "nearest".
Returns:
np.darray: Interpolated points
"""
field, geo_in = self.field(var, validtime=validtime)
points, interpolator = SurfexIO.interpolate_field(
field, geo_in, geo_out, interpolation=interpolation
)
return points, interpolator
[docs]class FaSurfexFile(SurfexIO):
"""FA surfex file."""
[docs] def __init__(self, filename, geo=None, masterodb=False, lfagmap=True):
"""Construct the surfex FA file.
Args:
filename (str): File name.
geo (surfex.Geo, optional): Surfex geometry. Defaults to None.
masterodb (bool, optional): _description_. Defaults to False.
lfagmap (bool, optional): _description_. Defaults to True.
"""
extension = SurfFileTypeExtension("FA", masterodb=masterodb, lfagmap=lfagmap)
extension_suffix = extension.suffix
# Surfex binaries use .fa suffix for all input files no matter if they are lfagmap files
if not filename.endswith(extension_suffix):
filename = filename + extension_suffix
SurfexIO.__init__(self, filename, geo, extension)
self.lfagmap = lfagmap
[docs] def field(self, var, validtime=None):
"""Read field from FA file.
Args:
var (SurfexFileVariable): Variable in surfex file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
Returns:
np.darray: Field, surfex.Geo in read file
"""
file_handler = Fa(self.filename)
field, geo_in = file_handler.field(var.varname, validtime)
# Reshape to fortran 2D style
logging.debug("field=%s, field.shape=%s", field, field.shape)
field = np.reshape(field, [geo_in.nlons, geo_in.nlats], order="F")
return field, geo_in
[docs] def points(self, var, geo_out, validtime=None, interpolation="nearest"):
"""Read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geo): Surfex geometry for points.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type. Defaults to "nearest".
Returns:
tuple: Interpolated points, surfex.Interpolator
"""
field, geo_in = self.field(var, validtime=validtime)
points, interpolator = SurfexIO.interpolate_field(
field, geo_in, geo_out, interpolation=interpolation
)
return points, interpolator
[docs]class SurfFileTypeExtension(object):
"""Extension of a surfex file."""
[docs] def __init__(self, csurf_filetype, masterodb=False, lfagmap=True):
"""Construct an extension object.
Args:
csurf_filetype (str): Surfex file type.
masterodb (bool, optional): File created with masterodb. Defaults to False.
lfagmap (bool, optional): File created with lfagmap=True. Defaults to True.
"""
suffix = None
extension = ""
if csurf_filetype.lower() == "nc":
extension = "nc"
elif csurf_filetype.lower() == "ascii":
extension = "txt"
elif csurf_filetype.lower() == "fa":
if lfagmap:
extension = "sfx"
if not masterodb:
suffix = ".fa"
else:
extension = "fa"
if suffix is None:
suffix = "." + extension
self.type = extension
self.suffix = suffix
[docs]class NetCDFSurfexFile(SurfexIO):
"""Reading surfex NetCDF time series output."""
[docs] def __init__(self, filename, geo):
"""Construct a NetCDF surfex time series file.
Args:
filename (str): Name of file.
geo (surfex.Geometry): Surfex geometry for time series.
"""
self.file_handler = netCDF4.Dataset(filename, "r")
SurfexIO.__init__(self, filename, geo, "nc")
[docs] def read(self, var, times):
"""Read a field, return a 2D array.
Args:
var (SurfexFileVariable): Variable in surfex file.
times (list): List of datetime.datetime objects
Raises:
ValueError: times must be list or tuple
ValueError: patches must be list or tuple
ValueError: patches must be list or tuple
RuntimeError: Variable not found!
NotImplementedError: Dimension not implemented
RuntimeError: Mismatch in points
Returns:
(tuple): (np.array. surfex.Geometry)
"""
layers = var.layers
patches = var.patches
if not isinstance(times, (list, tuple)):
raise ValueError("times must be list or tuple")
if not isinstance(layers, (list, tuple)):
raise ValueError("patches must be list or tuple")
if not isinstance(patches, (list, tuple)):
raise ValueError("patches must be list or tuple")
values = np.array([])
times_read = []
ndims = 0
dim_indices = []
mapping = {}
npatch = 1
if self.file_handler.variables[var.varname].shape[0] > 0:
for dim in self.file_handler.variables[var.varname].dimensions:
dimlen = self.file_handler.variables[var.varname].shape[ndims]
this_dim = []
if dim == "time":
mapping[0] = ndims
times_for_var = self.file_handler.variables["time"]
units = times_for_var.units
try:
t_cal = times_for_var.calendar
except AttributeError: # Attribute doesn't exist
t_cal = "gregorian" # or standard
indices = list(range(0, dimlen))
times_for_var = netCDF4.num2date(
times_for_var[indices], units=units, calendar=t_cal
)
if len(times) > 0:
for t_to_find, t_to_find_val in enumerate(times):
for tstep in range(0, len(indices)):
if times_for_var[tstep] == t_to_find_val:
this_dim.append(tstep)
times_read.append(times[t_to_find])
else:
times_read = times_for_var
this_dim = list(range(0, dimlen))
elif dim == "Number_of_points":
mapping[1] = ndims
this_dim = list(range(0, dimlen))
elif dim == "xx":
mapping[1] = ndims
this_dim = list(range(0, dimlen))
elif dim == "yy":
mapping[2] = ndims
this_dim = list(range(0, dimlen))
elif dim == "Number_of_Patches":
mapping[3] = ndims
npatch = dimlen
if len(patches) > 0:
npatch = len(patches)
this_dim = patches
else:
this_dim = list(range(0, dimlen))
elif dim == "Number_of_Layers":
mapping[4] = ndims
npatch = dimlen
if len(layers) > 0:
this_dim = layers
else:
this_dim = list(range(0, dimlen))
elif dim == "lon":
mapping[1] = ndims
this_dim = list(range(0, dimlen))
elif dim == "lat":
mapping[2] = ndims
this_dim = list(range(0, dimlen))
else:
raise NotImplementedError("Not implemented for: " + dim)
dim_indices.append(this_dim)
ndims = ndims + 1
print(dim_indices)
field = self.file_handler.variables[var.varname][dim_indices]
# Add extra dimensions
i = 0
reverse_mapping = []
for dim in range(0, 5):
if dim not in mapping:
logging.debug("Adding dimension %s", dim)
field = np.expand_dims(field, len(dim_indices) + i)
reverse_mapping.append(len(dim_indices) + i)
i = i + 1
else:
reverse_mapping.append(mapping[dim])
# Transpose to 5D array
field = np.transpose(field, reverse_mapping)
npoints = self.geo.npoints * npatch
# Create 2-D array with times and points as for the other formats
for tstep in range(0, field.shape[0]):
field2d = np.empty(npoints)
i = 0
for (patch,) in range(0, npatch):
if self.geo.mask is not None:
iii = 0
j = 0
# For some reason the NetCDF files have one less dimension in all
# dimensions than the PGD dimension and mask needs x first.
for xxx in range(-1, field.shape[1] + 1):
for yyy in range(-1, field.shape[2] + 1):
if xxx in range(0, field.shape[1]) and yyy in range(
0, field.shape[2]
):
if self.geo.mask[j] == iii:
field2d[i] = np.nan
if field[tstep, xxx, yyy, patch] != np.nan:
field2d[i] = field[tstep, xxx, yyy, patch]
i = i + 1
j = j + 1
iii = iii + 1
else:
for yyy in range(0, field.shape[2]):
for xxx in range(0, field.shape[1]):
field2d[i] = np.nan
if field[tstep, xxx, yyy, patch] != np.nan:
field2d[i] = field[tstep, xxx, yyy, patch]
i = i + 1
if i != npoints:
raise RuntimeError(
"Mismatch in points " + str(i) + "!=" + str(npoints)
)
values = np.append(values, field2d)
# Re-shape to proper format
values = np.reshape(values, [field.shape[0], npoints])
else:
raise RuntimeError("Variable " + var.varname + " not found!")
return values, self.geo
[docs] def field(self, var, validtime=None):
"""Read field.
Args:
var (SurfexFileVariable): Variable in surfex file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
Returns:
tuple: (np.array, surfex.Geometry)
"""
if validtime is None:
validtime = []
else:
validtime = [validtime]
field, geo_in = self.read(var, validtime)
# Reshape to fortran 2D style
field = np.reshape(field, [geo_in.nlons, geo_in.nlats], order="F")
return field, geo_in
[docs] def points(self, var, geo_out, validtime=None, interpolation="nearest"):
"""Read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geometry): Surfex geometry to interpolate to.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type.. Defaults to "nearest".
Returns:
tuple: points, surfex.Interpolator
"""
field, geo_in = self.field(var, validtime=validtime)
points, interpolator = SurfexIO.interpolate_field(
field, geo_in, geo_out, interpolation=interpolation
)
return points, interpolator
[docs]class TexteSurfexFile(SurfexIO):
"""Reading surfex TEXTE output."""
[docs] def __init__(self, filename, geo):
"""Construct the Texte file.
Args:
filename (str): Filename
geo (surfex.Geometry): Geometry
"""
self.file = None
SurfexIO.__init__(self, filename, geo, "TXT")
[docs] def read(self, variable, times):
"""Read file.
Args:
variable (SurfexFileVariable): Variable in surfex file.
times (list): List of datetime.datetime to read.
Raises:
RuntimeError: Basetime must be set for TEXTE
RuntimeError: Interval must be set for TEXTE
RuntimeError: times must be list or tuple
RuntimeError: Dimension of domain does not match end of line
Returns:
tuple: (np.array, surfex.Geometry)
"""
self.file = open(self.filename, mode="r", encoding="utf-8")
base_time = variable.basetime
interval = variable.interval
npatch = variable.patches
if base_time is None:
raise RuntimeError("Basetime must be set for TEXTE")
if interval is None:
raise RuntimeError("Interval must be set for TEXTE")
if not isinstance(times, (list, tuple)):
raise RuntimeError("times must be list or tuple")
values = np.array([])
times_read = np.array([])
end_of_line = self.geo.npoints * npatch
this_time = np.empty(self.geo.npoints * npatch)
tstep = 0
col = 0
for line in self.file.read().splitlines():
words = line.split()
if len(words) > 0:
for i, word in enumerate(words):
val = float(word.replace("D", "E"))
if val == 1e20:
val = np.nan
this_time[col] = val
col = col + 1
if col == end_of_line:
if (
times is None
or (base_time + as_timedelta(seconds=(tstep * interval)))
in times
):
values = np.append(values, this_time)
times_read = np.append(
times_read,
base_time + as_timedelta(seconds=(tstep * interval)),
)
tstep = tstep + 1
col = 0
this_time[:] = np.nan
if i != len(words) - 1:
raise RuntimeError(
"Dimension of domain does not match end of line! "
+ str(i)
+ " != "
+ str(len(words) - 1)
)
if times_read.shape[0] > 0:
values = np.asarray(values)
values = np.reshape(values, [times_read.shape[0], this_time.shape[0]])
else:
logging.info("No data found!")
self.file.close()
return values, self.geo
[docs] def field(self, var, validtime=None):
"""Read field.
Args:
var (SurfexFileVariable): Variable in surfex file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
Returns:
tuple: (np.array, surfex.Geometry)
"""
if validtime is not None:
validtime = [validtime]
field, geo_in = self.read(var, validtime)
# Reshape to fortran 2D style
field = np.reshape(field, [geo_in.nlons, geo_in.nlats], order="F")
return field, geo_in
[docs] def points(self, var, geo_out, validtime=None, interpolation="nearest"):
"""Read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geometry): Geometry in file.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type. Defaults to "nearest".
Returns:
tuple: (np.array, surfex.Interpolator)
"""
field, geo_in = self.field(var, validtime=validtime)
points, interpolator = SurfexIO.interpolate_field(
field, geo_in, geo_out, interpolation=interpolation
)
return points, interpolator
[docs]class ForcingFileNetCDF(SurfexIO):
"""Forcing netCDF file."""
[docs] def __init__(self, fname, geo):
"""Construct forcing netcdf file.
Args:
fname (str): File name.
geo (surfex.Geo): Surfex input geometry.
"""
self.fname = fname
self.file_handler = netCDF4.Dataset(fname, "r")
self.lons = self.file_handler.variables["LON"]
self.lats = self.file_handler.variables["LAT"]
SurfexIO.__init__(self, fname, geo, "nc")
[docs] def read_field(self, variable, times):
"""Read file.
Args:
variable (_type_): _description_
times (_type_): _description_
Raises:
RuntimeError: You must set time!
RuntimeError: No points found
RuntimeError: Valid time not found in file!
Returns:
tuple: field, geo
"""
var = variable.varname
field = None
if var in self.file_handler.variables:
if self.file_handler.variables[var].shape[0] > 0:
if len(self.file_handler.variables[var].dimensions) == 1:
dimlen = self.file_handler.variables[var].shape[0]
field = self.file_handler.variables[var][0:dimlen]
else:
if len(times) == 0:
raise RuntimeError("You must set time!")
times_read = []
ndims = 0
npoints = 0
for dim in self.file_handler.variables[var].dimensions:
dimlen = self.file_handler.variables[var].shape[ndims]
if dim == "time":
times_for_var = self.file_handler.variables["time"]
units = times_for_var.units
try:
t_cal = times_for_var.calendar
except AttributeError: # Attribute doesn't exist
t_cal = "gregorian" # or standard
indices = list(range(0, dimlen))
times_for_var = netCDF4.num2date(
times_for_var[indices], units=units, calendar=t_cal
)
for times_to_read_val in times:
for tstep, times_for_var_val in enumerate(times_for_var):
test_time = times_for_var_val.strftime("%Y%m%d%H")
test_time = as_datetime(test_time)
if test_time == times_to_read_val:
times_read.append(tstep)
logging.debug("%s %s", tstep, times_to_read_val)
else:
npoints = dimlen
ndims = ndims + 1
if npoints == 0:
raise RuntimeError("No points found")
if len(times_read) == 0 and len(times) > 0:
logging.error("%s", times)
raise RuntimeError("Valid time not found in file!")
field = self.file_handler.variables[var][times_read, 0:npoints]
else:
logging.warning("Variable %s not found!", var)
return field, self.geo
[docs] def field(self, var, validtime=None):
"""Read field.
Args:
var (_type_): _description_
validtime (list, optional): Validtime. Defaults to None.
Returns:
tuple: field, geo
"""
if validtime is None:
validtime = []
else:
validtime = [validtime]
field, geo_in = self.read_field(var, validtime)
# Reshape to fortran 2D style
if field is not None:
field = np.reshape(field, [geo_in.nlons, geo_in.nlats], order="F")
return field, geo_in
[docs] def points(self, var, geo_out, validtime=None, interpolation=None):
"""Read points.
Args:
var (SurfexFileVariable): Variable in surfex file.
geo_out (surfex.Geo): Surfex geometry for points.
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation type. Defaults to "nearest".
Returns:
tuple: Interpolated points, surfex.Interpolator
"""
field, geo_in = self.field(var, validtime=validtime)
points, interpolator = SurfexIO.interpolate_field(
field, geo_in, geo_out, interpolation=interpolation
)
return points, interpolator
def read_surfex_field(
varname,
filename,
validtime=None,
basetime=None,
patches=-1,
layers=-1,
fileformat=None,
filetype=None,
geo=None,
datatype=None,
interval=None,
tiletype="FULL",
):
"""Read surfex field.
Args:
varname (_type_): _description_
filename (_type_): _description_
validtime (_type_, optional): _description_. Defaults to None.
basetime (_type_, optional): _description_. Defaults to None.
patches (int, optional): _description_. Defaults to -1.
layers (int, optional): _description_. Defaults to -1.
fileformat (_type_, optional): _description_. Defaults to None.
filetype (_type_, optional): _description_. Defaults to None.
geo (_type_, optional): _description_. Defaults to None.
datatype (_type_, optional): _description_. Defaults to None.
interval (_type_, optional): _description_. Defaults to None.
tiletype(str, optional): Tiletype. Defaults to "FULL".
Raises:
RuntimeError: Not implemented and geo is None
RuntimeError: You need to provide a geo object.
Returns:
field (np.ndarray): Field
"""
if fileformat is None:
fileformat, filetype = guess_file_format(filename, filetype)
if filetype == "surf":
if fileformat.lower() == "ascii":
geo = AsciiSurfexFile(filename).geo
elif fileformat.lower() == "nc":
geo = NCSurfexFile(filename).geo
else:
if geo is None:
raise RuntimeError("Not implemented and geo is None")
elif geo is None:
raise RuntimeError(
"You need to provide a geo object. Filetype is: " + str(filetype)
)
sfx_io = get_surfex_io_object(
filename, filetype=filetype, fileformat=fileformat, geo=geo
)
var = SurfexFileVariable(
varname,
validtime=validtime,
patches=patches,
layers=layers,
basetime=basetime,
interval=interval,
datatype=datatype,
tiletype=tiletype,
)
field, __ = sfx_io.field(var, validtime=validtime)
return field
def read_surfex_points(
varname,
filename,
geo_out,
validtime=None,
basetime=None,
patches=-1,
layers=-1,
fileformat=None,
filetype=None,
geo=None,
datatype=None,
interval=None,
interpolation="nearest",
tiletype="FULL",
):
"""Read surfex points.
Args:
varname (str): Variable name.
filename (str): File name.
geo_out (surfex.Geo): Surfex geometry
validtime (datetime.datetime, optional): Valid time. Defaults to None.
basetime (datetime.datetime, optional): Base time. Defaults to None.
patches (int, optional): Number of patches. Defaults to -1.
layers (int, optional): Numbers of layers. Defaults to -1.
fileformat (str, optional): File format. Defaults to None.
filetype (str, optional): File type. Defaults to None.
geo (surfex.Geo, optional): Input geometry if needed. Defaults to None.
datatype (str, optional): Data type. Defaults to None.
interval (int, optional): Interval between times. Defaults to None.
interpolation (str, optional): Interpolation method. Defaults to "nearest".
tiletype(str, optional): Tiletype. Defaults to "FULL".
Raises:
NotImplementedError: _description_
RuntimeError: _description_
Returns:
np.darray: Field
"""
if fileformat is None:
fileformat, filetype = guess_file_format(filename, filetype)
if filetype == "surf":
if fileformat.lower() == "ascii":
geo = AsciiSurfexFile(filename).geo
elif fileformat.lower() == "nc":
geo = NCSurfexFile(filename).geo
else:
if geo is None:
raise NotImplementedError(
f"{fileformat} is not implemented and geo is None"
)
elif geo is None:
raise RuntimeError(
"You need to provide a geo object. Filetype is: " + str(filetype)
)
sfx_io = get_surfex_io_object(
filename, filetype=filetype, fileformat=fileformat, geo=geo
)
var = SurfexFileVariable(
varname,
validtime=validtime,
patches=patches,
layers=layers,
basetime=basetime,
interval=interval,
datatype=datatype,
tiletype=tiletype,
)
field, geo_out = sfx_io.points(
var, geo_out, validtime=validtime, interpolation=interpolation
)
return field