"""Grib treatment."""
import logging
import numpy as np
import pyproj
try:
import eccodes
import gribapi
except ImportError:
eccodes = None
gribapi = None
except RuntimeError:
eccodes = None
gribapi = None
# Needed in Python 3.5
except: # noqa
eccodes = None
gribapi = None
from .geo import ConfProj, Geo, LonLatReg
from .interpolation import Interpolation
[docs]class Grib(object):
"""Grib class."""
[docs] def __init__(self, fname):
"""Construct grib object.
Args:
fname (str): File name.
"""
self.fname = fname
self.projection = None
self.lons = None
self.lats = None
self.nearest = None
self.linear = None
logging.debug("Grib constructor")
[docs] def field(self, gribvar, time):
"""Read field in grib file.
Args:
gribvar (GribVariable1/2): Grib variable to read.
time (datetime.datetime): Valid time to read.
Returns:
np.ndarray: Field
Raises:
NotImplementedError: NotImplementedError
RuntimeError: If eccodes not available
"""
if eccodes is None:
raise RuntimeError("eccodes not found. Needed for reading grib files")
logging.debug("Look for %s", gribvar.generate_grib_id())
field = None
geo_out = None
with open(self.fname, mode="rb") as file_handler:
while 1:
gid = eccodes.codes_grib_new_from_file(file_handler)
if gid is None:
logging.warning("Could not find key")
gribvar.print_keys()
file_handler.close()
return field, geo_out
else:
if gribvar.matches(gid):
values = self.read_field_in_message(gid, time)
logging.debug("read values = %s", values)
grid_type = str(eccodes.codes_get(gid, "gridType"))
logging.debug("grid_type=%s", grid_type)
if grid_type.lower() == "rotated_ll":
geo_keys = [
"Ni",
"Nj",
"latitudeOfFirstGridPointInDegrees",
"longitudeOfFirstGridPointInDegrees",
"latitudeOfLastGridPointInDegrees",
"longitudeOfLastGridPointInDegrees",
"iDirectionIncrementInDegrees",
"jDirectionIncrementInDegrees",
"latitudeOfSouthernPoleInDegrees",
"longitudeOfSouthernPoleInDegrees",
"iScansNegatively",
"jScansPositively",
]
geo_info = self.read_geo_info(gid, geo_keys)
n_x = geo_info["Ni"]
n_y = geo_info["Nj"]
ll_lon = geo_info["longitudeOfFirstGridPointInDegrees"]
ll_lat = geo_info["latitudeOfFirstGridPointInDegrees"]
dlon = geo_info["iDirectionIncrementInDegrees"]
dlat = geo_info["jDirectionIncrementInDegrees"]
sp_lon = geo_info["longitudeOfSouthernPoleInDegrees"]
iscan = geo_info["iScansNegatively"]
jscan = geo_info["jScansPositively"]
if sp_lon < -180.0:
sp_lon = sp_lon + 360.0
elif sp_lon > 180.0:
sp_lon = sp_lon - 360.0
sp_lat = -1 * geo_info["latitudeOfSouthernPoleInDegrees"]
earth = 6.371229e6
proj_string = (
f"+proj=ob_tran +o_proj=longlat +o_lat_p={sp_lat}"
f" +R={str(earth)} +no_defs"
)
logging.info(proj_string)
logging.info("ll_lon=%s ll_lat=%s", ll_lon, ll_lat)
logging.info("polon=%s polat=%s", sp_lon, sp_lat)
logging.info("dlon=%s dlat=%s", dlon, dlat)
logging.info("iscan=%s jscan=%s", iscan, jscan)
proj = pyproj.CRS.from_string(proj_string)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
lons = []
for i in range(0, n_x):
if int(iscan) == 1:
lon = ll_lon - (float(i) * dlon)
else:
lon = ll_lon + (float(i) * dlon)
if lon < -180.0:
lon = lon + 360.0
elif lon > 180.0:
lon = lon - 360.0
lons.append(lon)
lats = []
for j in range(0, n_y):
if int(jscan) == 1:
lat = ll_lat + (float(j) * dlat)
else:
lat = ll_lat - (float(j) * dlat)
if lat > 90.0:
lat = lat - 90.0
elif lat < -90.0:
lat = lat + 90.0
lats.append(lat)
lons = np.array(lons)
lats = np.array(lats)
longitudes, latitudes = np.meshgrid(lons, lats, indexing="ij")
lons, lats = pyproj.Transformer.from_crs(
proj, wgs84, always_xy=True
).transform(longitudes, latitudes)
lons = lons + sp_lon
field = np.reshape(values, [n_x, n_y], order="F")
if geo_out is None:
geo_out = Geo(lons, lats)
elif grid_type.lower() == "regular_ll":
geo_keys = [
"Ni",
"Nj",
"latitudeOfFirstGridPointInDegrees",
"longitudeOfFirstGridPointInDegrees",
"latitudeOfLastGridPointInDegrees",
"longitudeOfLastGridPointInDegrees",
"iDirectionIncrementInDegrees",
"jDirectionIncrementInDegrees",
]
geo_info = self.read_geo_info(gid, geo_keys)
n_x = geo_info["Ni"]
n_y = geo_info["Nj"]
lon0 = geo_info["longitudeOfFirstGridPointInDegrees"]
lat0 = geo_info["latitudeOfFirstGridPointInDegrees"]
d_x = geo_info["iDirectionIncrementInDegrees"]
d_y = geo_info["jDirectionIncrementInDegrees"]
lons = []
lats = []
for i in range(0, n_x):
lons.append(lon0 + (float(i) * d_x))
for j in range(0, n_y):
lats.append(lat0 - (float(j) * d_y))
lon1 = lons[-1]
lat1 = lats[-1]
lons = np.array(lons)
lats = np.array(lats)
lons, lats = np.meshgrid(lons, lats)
field = np.reshape(values, [n_x, n_y], order="F")
if geo_out is None:
domain = {
"nam_lonlat_reg": {
"xlonmin": lon0,
"xlonmax": lon1,
"xlatmin": lat0,
"xlatmax": lat1,
"nlon": n_x,
"nlat": n_y,
}
}
geo_out = LonLatReg(domain)
elif grid_type.lower() == "lambert":
geo_keys = [
"Nx",
"Ny",
"latitudeOfFirstGridPointInDegrees",
"longitudeOfFirstGridPointInDegrees",
"LoVInDegrees",
"DxInMetres",
"DyInMetres",
"iScansNegatively",
"jScansPositively",
"jPointsAreConsecutive",
"Latin1InDegrees",
"LaDInDegrees",
"Latin2InDegrees",
"latitudeOfSouthernPoleInDegrees",
"longitudeOfSouthernPoleInDegrees",
]
geo_info = self.read_geo_info(gid, geo_keys)
n_x = geo_info["Nx"]
n_y = geo_info["Ny"]
lon0 = geo_info["LoVInDegrees"]
lat0 = geo_info["LaDInDegrees"]
ll_lon = geo_info["longitudeOfFirstGridPointInDegrees"]
ll_lat = geo_info["latitudeOfFirstGridPointInDegrees"]
d_x = geo_info["DxInMetres"]
d_y = geo_info["DyInMetres"]
earth = 6.37122e6
proj_string = (
f"+proj=lcc +lat_0={str(lat0)} +lon_0={str(lon0)} "
f"+lat_1={str(lat0)} +lat_2={str(lat0)} "
f"+units=m +no_defs +R={str(earth)}"
)
proj = pyproj.CRS.from_string(proj_string)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
x_0, y_0 = pyproj.Transformer.from_crs(
wgs84, proj, always_xy=True
).transform(ll_lon, ll_lat)
x_c = x_0 + 0.5 * (n_x - 1) * d_x
y_c = y_0 + 0.5 * (n_y - 1) * d_y
lonc, latc = pyproj.Transformer.from_crs(
proj, wgs84, always_xy=True
).transform(x_c, y_c)
# TODO we should investigate scan angle and if done correctly
# order should probaly be "C" and not "F"
field = np.reshape(values, [n_x, n_y], order="F")
if geo_out is None:
domain = {
"nam_conf_proj": {"xlon0": lon0, "xlat0": lat0},
"nam_conf_proj_grid": {
"xloncen": lonc,
"xlatcen": latc,
"nimax": n_x,
"njmax": n_y,
"xdx": d_x,
"xdy": d_y,
"ilone": 0,
"ilate": 0,
},
}
geo_out = ConfProj(domain)
else:
raise NotImplementedError(
str(grid_type) + " not implemented yet!"
)
eccodes.codes_release(gid)
if geo_out is None:
raise RuntimeError("No geometry is found in file")
return field, geo_out
eccodes.codes_release(gid)
[docs] @staticmethod
def read_geo_info(gid, keys):
"""Set geo keys in dict.
Args:
gid (int): grib id
keys (dict): Geometry keys
Returns:
dict: Geometry
"""
geo_dict = {}
for key in keys:
try:
logging.debug(" %s: %s", key, eccodes.codes_get(gid, key))
geo_dict.update({key: eccodes.codes_get(gid, key)})
except eccodes.KeyValueNotFoundError as err:
logging.debug(' Key="%s" was not found: %s', key, err.msg)
except eccodes.CodesInternalError as err:
logging.error('Error with key="%s" : %s', key, err.msg)
return geo_dict
[docs] @staticmethod
def read_field_in_message(gid, time):
"""Get field.
Args:
gid (int): grib id
time(str): time
Returns:
Field: Field object
"""
try:
logging.debug(
"There are %d values, average is %f, min is %f, max is %f",
eccodes.codes_get_size(gid, "values"),
eccodes.codes_get(gid, "average"),
eccodes.codes_get(gid, "min"),
eccodes.codes_get(gid, "max"),
)
field = eccodes.codes_get_values(gid)
# TODO Check time consistency
logging.info("Grib record found is hopefullly valid for time %s", str(time))
has_bitmap = 0
try:
has_bitmap = int(eccodes.codes_get(gid, "bitmapPresent"))
except eccodes.KeyValueNotFoundError as err:
logging.debug(' Key="bitmapPresent" was not found: %s', err.msg)
except eccodes.CodesInternalError as err:
logging.error('Error with key="bitmapPresent" : %s', err.msg)
if has_bitmap == 1:
missing_value = eccodes.codes_get(gid, "missingValue")
field[field == missing_value] = np.nan
return field
except eccodes.KeyValueNotFoundError as err:
logging.debug(' Key="missingValue" was not found: %s', err.msg)
except eccodes.CodesInternalError as err:
logging.error('Error with key="missingValue" : %s', err.msg)
return None
[docs] def points(self, gribvar, geo, validtime=None, interpolation="bilinear"):
"""Read a 2-D field and interpolates it to requested positions.
Args:
gribvar (GribVariable1/2): Grib variable
geo (surfex.Geo): Surfex geometry
validtime (datetime.datetime, optional): Valid time. Defaults to None.
interpolation (str, optional): Interpolation method. Defaults to "bilinear".
Returns:
np.array: vector with interpolated values
"""
field, geo_in = self.field(gribvar, validtime)
interpolator = Interpolation(interpolation, geo_in, geo)
field = interpolator.interpolate(field)
return field, interpolator
class Grib1Variable:
"""Grib1 variable."""
def __init__(self, par, typ, level, tri=0):
"""Construct grib1 variable.
Args:
par (int): IndicatorOfParameter
typ (int): TypeOfLevel
level (int): NumberOFLevel
tri (int, optional): TimeRangeIndicator. Defaults to 0.
"""
self.version = 1
self.par = int(par)
self.typ = int(typ)
self.level = int(level)
self.tri = int(tri)
def is_accumulated(self):
"""Check if accumulated."""
if self.tri == 4:
logging.debug("Value is accumulated")
return True
else:
return False
def matches(self, gid):
"""Check if matchees.
Args:
gid (_type_): _description_
Returns:
bool: True if found
Raises:
RuntimeError: If eccodes not found
"""
if eccodes is None:
raise RuntimeError("eccodes not found. Needed for reading grib files")
version = int(eccodes.codes_get(gid, "editionNumber"))
if version == 1:
par = int(eccodes.codes_get_long(gid, "indicatorOfParameter"))
lev = int(eccodes.codes_get_long(gid, "level"))
typ = int(eccodes.codes_get_long(gid, "levelType"))
tri = int(eccodes.codes_get_long(gid, "timeRangeIndicator"))
logging.debug("Checking grib1 record: %s %s %s %s", par, typ, lev, tri)
logging.debug(self.generate_grib_id())
if (
self.par == par
and self.level == lev
and self.typ == typ
and self.tri == tri
):
logging.debug(
"Found matching grib1 record: %s %s %s %s", par, lev, typ, tri
)
return True
else:
return False
else:
logging.warning("Record is not grib1")
return False
def print_keys(self):
"""Print keys."""
print("\n")
print("Version:", self.version)
print("indicatorOfParameter:", self.par)
print("levelType", self.typ)
print("level:", self.level)
print("timeRangeIndicator:", self.tri)
def generate_grib_id(self):
"""Generate grib1 ID."""
par = self.par
typ = self.typ
level = self.level
tri = self.tri
return f":grib1:{str(par)}:{str(typ)}:{str(level)}:{str(tri)}"
[docs]class Grib2Variable(object):
"""Grib2 variable."""
[docs] def __init__(self, discipline, pca, pnr, typ, lev, tsp=-1):
"""Construct a grib2 variable.
Args:
discipline (int): discipline
pca (int): parameterCatergory
pnr (int): parameterNumber
typ (int): levelType
lev (int): level
tsp (int, optional): typeOfStatisticalProcessing. Defaults to -1.
"""
self.version = 2
self.discipline = int(discipline)
self.parameter_category = int(pca)
self.parameter_number = int(pnr)
self.level_type = int(typ)
self.level = int(lev)
self.type_of_statistical_processing = int(tsp)
[docs] def matches(self, gid):
"""Check if matches.
Args:
gid (int): Grib ID
Raises:
ModuleNotFoundError: If not eccodes found
Returns:
bool: True if matches
"""
if eccodes is None:
raise ModuleNotFoundError("eccodes not found. Needed for reading grib files")
version = int(eccodes.codes_get(gid, "editionNumber"))
if version == 2:
discipline = int(eccodes.codes_get(gid, "discipline"))
parameter_category = int(eccodes.codes_get(gid, "parameterCategory"))
parameter_number = int(eccodes.codes_get(gid, "parameterNumber"))
level_type = int(eccodes.codes_get_long(gid, "levelType"))
level = int(eccodes.codes_get(gid, "level"))
try:
type_of_statistical_processing = eccodes.codes_get(
gid, "typeOfStatisticalProcessing"
)
type_of_statistical_processing = int(type_of_statistical_processing)
except gribapi.errors.KeyValueNotFoundError:
type_of_statistical_processing = -1
logging.debug(
"Checking grib2 record: %s %s %s %s %s %s",
discipline,
parameter_category,
parameter_number,
level_type,
level,
type_of_statistical_processing,
)
logging.debug("grib2 ID: %s", self.generate_grib_id())
if (
self.discipline == discipline
and self.parameter_category == parameter_category
and self.parameter_number == parameter_number
and self.level_type == level_type
and self.level == level
and self.type_of_statistical_processing == type_of_statistical_processing
):
logging.debug(
"Found matching grib2 record: %s %s %s %s %s",
discipline,
parameter_category,
parameter_number,
level_type,
level,
)
return True
else:
return False
else:
logging.warning("Record is not grib2")
return False
[docs] def is_accumulated(self):
"""Check if accumulated."""
if self.type_of_statistical_processing == 1:
logging.debug("Parameter is accumulated")
return True
else:
return False
[docs] def print_keys(self):
"""Print keys."""
print("\n")
print("Version:", self.version)
print("discipline:", self.discipline)
print("parameterCategory:", self.parameter_category)
print("parameterNumber:", self.parameter_number)
print("levelType:", self.level_type)
print("level:", self.level)
print("typeOfStatisticalProcessing:", self.type_of_statistical_processing)
[docs] def generate_grib_id(self):
"""Generate grib2 ID."""
dis = str(self.discipline)
pca = str(self.parameter_category)
pnr = str(self.parameter_number)
typ = str(self.level_type)
lev = str(self.level)
tsp = str(self.type_of_statistical_processing)
return f":grib2:{dis}:{pca}:{pnr}:{typ}:{lev}:{tsp}"