"""bufr treatment."""
import logging
from math import exp
import numpy as np
try:
import eccodes # type: ignore
except ImportError:
eccodes = None
logging.warning("ECCODES not found. Needed for bufr reading")
except RuntimeError:
eccodes = None
logging.warning("ECCODES not found. Needed for bufr reading")
# Needed in Python 3.5
except Exception:
logging.warning("Could not load eccodes")
eccodes = None
from .datetime_utils import as_datetime_args
from .obs import ObservationSet
from .observation import Observation
[docs]class BufrObservationSet(ObservationSet):
"""Create observation data set from bufr observations."""
def __init__(
self,
bufrfile,
variables,
valid_dtg,
valid_range,
lonrange=None,
latrange=None,
label="bufr",
use_first=False,
sigmao=None,
):
"""Initialize a bufr observation set.
Args:
bufrfile (str): Full path of the bufr file to read
variables(list): Variables to read
valid_dtg (datetime.datetime): DTG string with valid time
valid_range (datetime.timedelta): The allowed time range
lonrange (list): Allowed range of longitudes [min, max]
latrange (list): Allowed range of latitides [min, max]
label (str): A label for the resulting observations set
use_first (bool): Use only the first valid observation for a point if more are found
sigmao (float, optional): Observation error relative to normal background error. Defaults to None.
Raises:
RuntimeError: ECCODES not found. Needed for bufr reading
NotImplementedError: Not implemented
"""
if lonrange is None:
lonrange = [-180, 180]
if latrange is None:
latrange = [-90, 90]
if eccodes is None:
raise RuntimeError("ECCODES not found. Needed for bufr reading")
logging.debug(eccodes.__file__)
# open bufr file
file_handler = open(bufrfile, mode="rb")
number_of_bytes = file_handler.seek(0, 2)
logging.info("File size: %s", number_of_bytes)
file_handler.seek(0)
# define the keys to be printed
keys = [
"latitude",
"localLatitude",
"longitude",
"localLongitude",
"year",
"month",
"day",
"hour",
"minute",
"heightOfStationGroundAboveMeanSeaLevel",
"heightOfStation",
"stationNumber",
"blockNumber",
]
processed_threshold = 0
nerror = {}
ntime = {}
nundef = {}
ndomain = {}
nobs = {}
for var in variables:
if var == "relativeHumidityAt2M":
keys.append("airTemperatureAt2M")
keys.append("dewpointTemperatureAt2M")
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/airTemperature"
)
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=1.5"
"/airTemperature"
)
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/dewpointTemperature"
)
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=1.5"
"/dewpointTemperature"
)
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/relativeHumidity"
)
elif var == "airTemperatureAt2M":
keys.append("airTemperatureAt2M")
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/airTemperature"
)
keys.append(
"/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=1.5"
"/airTemperature"
)
else:
keys.append(var)
nerror.update({var: 0})
ntime.update({var: 0})
nundef.update({var: 0})
ndomain.update({var: 0})
nobs.update({var: 0})
logging.info("Reading %s", bufrfile)
logging.info("Looking for keys: %s", str(keys))
cnt = 0
observations = list()
# loop for the messages in the file
not_decoded = 0
registry = {}
while 1:
# get handle for message
bufr = eccodes.codes_bufr_new_from_file(file_handler)
if bufr is None:
break
# we need to instruct ecCodes to expand all the descriptors
# i.e. unpack the data values
try:
eccodes.codes_set(bufr, "unpack", 1)
decoded = True
except eccodes.CodesInternalError as err:
not_decoded = not_decoded + 1
logging.error('Error with key="unpack" : %s', err.msg)
decoded = False
# print the values for the selected keys from the message
if decoded:
lat = np.nan
local_lat = np.nan
lon = np.nan
local_lon = np.nan
value = np.nan
elev = np.nan
year = -1
month = -1
day = -1
hour = -1
minute = -1
stid = "NA"
station_number = -1
block_number = -1
site_name = "NA"
t2m = np.nan
rh2m = np.nan
td2m = np.nan
s_d = np.nan
temp = np.nan
t_d = np.nan
c_b = np.nan
for key in keys:
try:
logging.debug("Decode: %s", key)
val = eccodes.codes_get(bufr, key)
logging.debug("Got: %s=%s", key, val)
if (
val == eccodes.CODES_MISSING_DOUBLE
or val == eccodes.CODES_MISSING_LONG
):
val = np.nan
if key == "latitude":
lat = val
if lat < -90 or lat > 90:
lat = np.nan
if key == "longitude":
lon = val
if lon < -180 or lon > 180:
lon = np.nan
if key == "localLatitude":
local_lat = val
if local_lat < -90 or local_lat > 90:
local_lat = np.nan
if key == "localLongitude":
local_lon = val
if local_lon < -180 or local_lon > 180:
local_lon = np.nan
if key == "year":
year = val
if key == "month":
month = val
if key == "day":
day = val
if key == "hour":
hour = val
if key == "minute":
minute = val
if key == "heightOfStation":
if not np.isnan(val):
elev = val
if key == "heightOfStationGroundAboveMeanSeaLevel":
if not np.isnan(val):
elev = val
if key == "stationNumber":
station_number = val
if key == "blockNumber":
block_number = val
if key == "stationOrSiteName":
site_name = str(val)
if key == "airTemperatureAt2M":
t2m = val
if (
key
== "/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/airTemperature"
or key
== "/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=1.5"
"/airTemperature"
):
temp = val
if (
key
== "/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/relativeHumidity"
):
rh2m = val
if key == "dewpointTemperatureAt2M":
td2m = val
if (
key
== "/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=2"
"/dewpointTemperature"
or key
== "/heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform=1.5"
"/dewpointTemperature"
):
t_d = val
if key == "totalSnowDepth":
s_d = val
if key == "heightOfBaseOfCloud":
c_b = val
except eccodes.CodesInternalError:
logging.debug('Report does not contain key="%s"', key)
got_pos = True
if np.isnan(lat):
if np.isnan(local_lat):
got_pos = False
else:
lat = local_lat
if np.isnan(lon):
if np.isnan(local_lon):
got_pos = False
else:
lon = local_lon
if got_pos:
# Assign value to var
pos = f"{lon:.5f}:{lat:.5f}"
for var in variables:
exists = False
if use_first:
if pos in registry:
if var in registry[pos]:
exists = registry[pos][var]
else:
registry.update({pos: {}})
if not exists:
logging.debug("Pos does not exist %s %s", pos, var)
if var == "relativeHumidityAt2M":
if (
not np.isnan(t2m)
and not np.isnan(td2m)
and np.isnan(rh2m)
):
try:
value = self.td2rh(td2m, t2m)
value = value * 0.01
except Exception:
logging.debug("Got exception for %s:", var)
value = np.nan
elif (
not np.isnan(temp)
and not np.isnan(t_d)
and np.isnan(rh2m)
):
try:
value = self.td2rh(t_d, temp)
value = value * 0.01
except Exception:
logging.debug(
"Got exception for %s",
var,
)
value = np.nan
else:
value = np.nan
if np.isnan(value) and not np.isnan(rh2m):
value = 0.01 * rh2m
elif var == "airTemperatureAt2M":
if np.isnan(t2m):
if not np.isnan(temp):
value = temp
else:
value = t2m
elif var == "totalSnowDepth":
value = s_d
elif var == "heightOfBaseOfCloud":
value = c_b
elif var == "stationOrSiteName":
site_name = site_name
else:
raise NotImplementedError(
f"Var {var} is not coded! Please do it!"
)
else:
logging.debug("Pos already exists %s %s", pos, var)
all_found = True
if np.isnan(lat):
all_found = False
if np.isnan(lon):
all_found = False
if year == -1:
all_found = False
if month == -1:
all_found = False
if day == -1:
all_found = False
if hour == -1:
all_found = False
if minute == -1:
all_found = False
if np.isnan(elev):
all_found = False
if np.isnan(value):
all_found = False
if not all_found:
nerror.update({var: nerror[var] + 1})
logging.debug(
"Check on position in space and time %s %s %s %s %s %s",
lon,
lonrange[0],
lonrange[1],
lat,
latrange[0],
latrange[1],
)
if (
latrange[0] <= lat <= latrange[1]
and lonrange[0] <= lon <= lonrange[1]
):
obs_dtg = None
try:
obs_dtg = as_datetime_args(
year=year,
month=month,
day=day,
hour=hour,
minute=minute,
)
except ValueError:
logging.warning(
"Bad observations time: year=%s month=%s day=%s hour=%s minute=%s Position is lon=%s, lat=%s",
year,
month,
day,
hour,
minute,
lon,
lat,
)
obs_dtg = None
if not np.isnan(value) and obs_dtg is not None:
if self.inside_window(obs_dtg, valid_dtg, valid_range):
logging.debug(
"Valid DTG for station %s %s %s %s %s %s %s %s",
obs_dtg,
valid_dtg,
valid_range,
lon,
lat,
value,
elev,
stid,
)
if station_number > 0 and block_number > 0:
stid = str((block_number * 1000) + station_number)
if (
stid == "NA"
and site_name != "NA"
and site_name.isnumeric()
):
stid = site_name
observations.append(
Observation(
obs_dtg,
lon,
lat,
value,
elev=elev,
stid=stid,
varname=var,
)
)
if use_first:
registry[pos].update({var: True})
nobs.update({var: nobs[var] + 1})
else:
ntime.update({var: ntime[var] + 1})
else:
nundef.update({var: nundef[var] + 1})
else:
ndomain.update({var: ndomain[var] + 1})
cnt += 1
try:
nbytes = file_handler.tell()
except ValueError:
nbytes = number_of_bytes
processed = int(round(float(nbytes) * 100.0 / float(number_of_bytes)))
if processed > processed_threshold and processed % 5 == 0:
processed_threshold = processed
logging.info("Read: %s%%", processed)
# delete handle
eccodes.codes_release(bufr)
logging.info("Found %s/%s", str(len(observations)), str(cnt))
logging.info("Not decoded: %s", str(not_decoded))
for var in variables:
logging.info("Observations for var=%s: %s", var, str(nobs[var]))
logging.info(
"Observations removed because of domain check: %s", str(ndomain[var])
)
logging.info(
"Observations removed because of not being defined/found: %s",
str(nundef[var]),
)
logging.info(
"Observations removed because of time window: %s", str(ntime[var])
)
logging.info(
"Messages not containing information on all keys: %s", str(nerror[var])
)
# close the file
file_handler.close()
ObservationSet.__init__(self, observations, label=label, sigmao=sigmao)
[docs] @staticmethod
def td2rh(t_d, temp, kelvin=True):
"""Convert dew point to temperature.
Args:
t_d (float): Dew point temperature
temp (float): Temperature
kelvin (bool, optional): Kelvin. Defaults to True.
Raises:
RuntimeError: Dew point temperature is probably not Kelvin
RuntimeError: Temperature is probably not Kelvin
Returns:
float: Relative humidity (percent)
"""
if kelvin:
if t_d < 100:
raise RuntimeError("Dew point temperature is probably not Kelvin")
if temp < 100:
raise RuntimeError("Temperature is probably not Kelvin")
t_d = t_d - 273.15
temp = temp - 273.15
r_h = 100 * (
exp((17.625 * t_d) / (243.04 + t_d)) / exp((17.625 * temp) / (243.04 + temp))
)
if r_h > 110 or r_h < 1:
logging.warning(
"\nWARNING: Calculated rh to %s from %s and %s. Set it to missing",
str(r_h),
str(t_d),
str(temp),
)
r_h = np.nan
elif r_h > 100:
logging.warning(
"\nWARNING: Calculated rh to %s from %s and %s.",
str(r_h),
str(t_d),
str(temp) + " Truncate to 100%",
)
r_h = 100
return r_h
[docs] @staticmethod
def inside_window(obs_dtg, valid_dtg, valid_range):
"""Check if inside window.
Args:
obs_dtg (as_datetime): Observation datetime
valid_dtg (as_datetime): Valid datetime
valid_range (as_timedelta): Window
Returns:
bool: True if inside window
"""
if valid_dtg is None:
return True
else:
if (valid_dtg - valid_range) <= obs_dtg <= (valid_dtg + valid_range):
return True
else:
return False