Source code for pysurfex.obs

"""obs."""
import json
import logging
import os

import numpy as np
import requests

try:
    import cfunits
except ModuleNotFoundError:
    cfunits = None
except AssertionError:
    cfunits = None
except:  # noqa
    cfunits = None


from .datetime_utils import as_datetime, as_datetime_args, as_timedelta, utcfromtimestamp
from .observation import Observation
from .titan import dataset_from_file


[docs]class ObservationSet(object): """Set of observations."""
[docs] def __init__(self, observations, label="", sigmao=None): """Create an observation set. Args: observations (list): Observation objects. label (str, optional): Name of set. Defaults to "". sigmao (float, optional): Observation error relative to normal background error. Defaults to None. """ self.size = len(observations) self.label = label self.index_pos = {} self.index_stid = {} if sigmao is not None: logging.info( "Setting sigmao=%s for all observations for label=%s", sigmao, label ) for obs in observations: obs.sigmao = sigmao self.observations = observations
[docs] def get_stid_index(self, stid): """Get station ID index. Args: stid (str): Station ID. Returns: int: Found index """ stid = str(stid) if stid in self.index_stid: return self.index_stid[stid] else: return None
[docs] def get_pos_index(self, lon, lat): """Get position index. Args: lon (float):Longitude lat (float): Latitude Returns: int: Found position index. """ lon = Observation.format_lon(lon) lat = Observation.format_lat(lat) pos = lon + ":" + lat if pos in self.index_pos: return self.index_pos[pos] else: return None
[docs] def get_obs(self): """Get observations. Returns: (list, list , list, list, list, list, list): times, lons, lats, stids, elevs, values, varnames, sigmaos """ obs2vectors = np.vectorize(Observation.obs2vectors) logging.debug("Obs dim %s", len(self.observations)) if len(self.observations) > 0: times, lons, lats, stids, elevs, values, varnames, sigmaos = obs2vectors( self.observations ) for point, lon in enumerate(lons): lon = Observation.format_lon(lon) lat = Observation.format_lat(lats[point]) stid = str(stids[point]) pos = lon + ":" + lat self.index_pos.update({pos: point}) if stid != "NA": self.index_stid.update({stid: point}) times = times.tolist() lons = lons.tolist() lats = lats.tolist() stids = stids.tolist() elevs = elevs.tolist() values = values.tolist() varnames = varnames.tolist() sigmaos = sigmaos.tolist() return times, lons, lats, stids, elevs, values, varnames, sigmaos return [], [], [], [], [], [], [], []
[docs] def matching_obs(self, my_obs): """Match the observations. Args: my_obs (Observation): Observation to match. Returns: bool: True if found """ found = False for i in range(0, len(self.observations)): if my_obs.obstime == self.observations.obstimes[i]: lon = self.observations.obstimes[i] lat = self.observations.obstimes[i] pos = Observation.format_lon(lon) + ":" + Observation.format_lat(lat) if pos in self.index_pos: found = True return found
[docs] def points(self, geo, validtime=None): """Extract points from observations. Args: geo (surfex.Geo): Surfex geometry validtime (datetime.datetime, optional): Valid time. Defaults to None. Returns: np.ndarray: Values in points. """ my_times = [] my_values = [] my_stids = [] times, lons, lats, stids, __, values, __, __ = self.get_obs() for i in range(0, geo.nlons): lon = geo.lonlist[i] lat = geo.latlist[i] pos = Observation.format_lon(lon) + ":" + Observation.format_lat(lat) lons.append(lon) lats.append(lat) if pos in self.index_pos: ind = self.index_pos[pos] if validtime is not None: print("No time check implemented yet") my_times.append(times[ind]) my_stids.append(stids[ind]) my_values.append(values[ind]) else: my_times.append(None) my_stids.append("NA") my_values.append(np.nan) logging.info("Could not find position %s in this data source", pos) my_values = np.asanyarray(my_values) return my_times, my_values, my_stids
[docs] def write_json_file(self, filename, indent=None): """Write a json file. Args: filename (str): Name of file indent (int, optional): Indentation in file. Defaults to None. """ obs2vectors = np.vectorize(Observation.obs2vectors) data = {} if len(self.observations) > 0: obstimes, lons, lats, stids, elevs, values, varnames, sigmao = obs2vectors( self.observations ) for obs, lon in enumerate(lons): data.update( { obs: { "obstime": obstimes[obs].strftime("%Y%m%d%H%M%S"), "varname": varnames[obs], "lon": lon, "lat": lats[obs], "stid": stids[obs], "elev": elevs[obs], "value": values[obs], "sigmao": sigmao[obs], } } ) with open(filename, mode="w", encoding="utf-8") as file_handler: json.dump(data, file_handler, indent=indent)
[docs]class NetatmoObservationSet(ObservationSet): """Observation set from netatmo.""" def __init__( self, filenames, variable, target_time, dt=3600, re=True, lonrange=None, latrange=None, label="netatmo", sigmao=None, ): """Construct netatmo obs. Args: filenames (list): Filenames variable (str): Variable target_time (as_datetime): _description_ dt (int, optional): _description_. Defaults to 3600. re (bool, optional): _description_. Defaults to True. lonrange (_type_, optional): _description_. Defaults to None. latrange (_type_, optional): _description_. Defaults to None. label (str, optional): _description_. Defaults to "netatmo". sigmao (float, optional): Observation error relative to normal background error. Defaults to None. Raises: RuntimeError: Lonrange must be a list with length 2 RuntimeError: Latrange must be a list with length 2 """ if lonrange is None: lonrange = [-180, 180] if not isinstance(lonrange, list) or len(lonrange) != 2: raise RuntimeError(f"Lonrange must be a list with length 2 {lonrange}") if latrange is None: latrange = [-90, 90] if not isinstance(latrange, list) or len(latrange) != 2: raise RuntimeError(f"Latrange must be a list with length 2 {latrange}") data = {} # key: id, value: list of values times = {} # key: id, value: list of times metadata = {} # Parse json data # # The raw data is not valid JSON, since it is missing commas between lists # e.g. [...][...][...]. Instead format it like this: [..., ..., ...] observations = [] num_missing_metadata = 0 num_missing_obs = 0 num_missing_time = 0 num_missing_elev = 0 num_wrong_time = 0 for ifilename in filenames: with open(ifilename, mode="r", encoding="utf-8") as ifile: text = ifile.read() try: # Older files had an additional bug. This code will work for both cases if len(text) == 0: # Sometimes netatmo files are empty logging.info("Empty file: %s", ifilename) continue if text[0] == "{": text = f"[{text}" text = text.replace("}]{", "}{") text = text.replace("}][{", "},{") text = text.replace("}{", "},{") print(text) text = '{"data": %s}' % text raw = json.loads(text) raw = raw["data"] logging.debug("Parsing %d stations in %s", len(raw), ifilename) except RuntimeError: logging.error("Could not parse %s.", ifilename) continue for line in raw: if "data" in line and "_id" in line and "location" in line: my_id = line["_id"] location = line["location"] curr_data = line["data"] if variable in curr_data: if "time_utc" in curr_data: time_utc = curr_data["time_utc"] # Record this observation if it is closer to the target time than any # other previously parsed observation for this location. Also, only # ecord if the time difference is within acceptable limits. if "altitude" not in line: num_missing_elev += 1 if not re or "altitude" in line: lon = location[0] lat = location[1] if lonrange[0] <= lon <= lonrange[1]: if latrange[0] <= lat <= latrange[1]: if my_id not in data: data[my_id] = list() times[my_id] = list() elev = np.nan metadata[my_id] = { "lon": lon, "lat": lat, "elev": elev, } if ( np.isnan(metadata[my_id]["elev"]) and "altitude" in line ): metadata[my_id]["elev"] = line["altitude"] value = curr_data[variable] if variable == "Temperature": value = value + 273.15 if variable == "Humidity": value = value * 0.01 data[my_id] += [value] times[my_id] += [time_utc] else: num_missing_time += 1 else: num_missing_obs += 1 else: num_missing_metadata += 1 if target_time is not None: num_valid_stations = 0 for my_id, time in times.items(): this_diff_times = [ (utcfromtimestamp(t) - target_time).total_seconds() for t in time ] curr_times = [utcfromtimestamp(t) for t in time] if np.min(np.abs(np.array(this_diff_times))) < dt: ibest = int(np.argmin(np.abs(np.array(this_diff_times)))) curr_time = curr_times[ibest] elev = metadata[my_id]["elev"] observations.append( Observation( curr_time, metadata[my_id]["lon"], metadata[my_id]["lat"], data[my_id][ibest], elev=elev, varname=variable, ) ) num_valid_stations += 1 else: num_valid_stations = len(data) logging.info("Found %d valid observations:", num_valid_stations) logging.info(" %d missing obs", num_missing_obs) logging.info(" %d missing metadata", num_missing_metadata) logging.info(" %d missing timestamp", num_missing_time) logging.info(" %d wrong timestamp", num_wrong_time) if not re: extra = " (not removed)" else: extra = "" logging.debug(" %d missing elev%s", num_missing_elev, extra) ObservationSet.__init__(self, observations, label=label, sigmao=sigmao)
[docs]class MetFrostObservations(ObservationSet): """Observations from MET-Norway obs API (frost).""" def __init__( self, varname, stations=None, level=None, num_tries=3, wmo=None, providers=None, xproviders=None, blacklist=None, validtime=None, dt=3600, lonrange=None, latrange=None, unit=None, label="frost", sigmao=None, ): """Construct obs set from Frost. Args: varname (_type_): _description_ stations (_type_, optional): _description_. Defaults to None. level (_type_, optional): _description_. Defaults to None. num_tries (int, optional): _description_. Defaults to 3. wmo (_type_, optional): _description_. Defaults to None. providers (_type_, optional): _description_. Defaults to None. xproviders (_type_, optional): _description_. Defaults to None. blacklist (_type_, optional): _description_. Defaults to None. validtime (_type_, optional): _description_. Defaults to None. dt (int, optional): _description_. Defaults to 3600. lonrange (_type_, optional): _description_. Defaults to None. latrange (_type_, optional): _description_. Defaults to None. unit (_type_, optional): _description_. Defaults to None. label (str, optional): _description_. Defaults to "frost". sigmao (float, optional): Observation error relative to normal background error. Defaults to None. Raises: KeyError: _description_ Exception: _description_ Exception: _description_ Exception: _description_ Exception: _description_ """ if blacklist is None: blacklist = [] # extract client ID from environment variable if "CLIENTID" not in os.environ: raise KeyError("error: CLIENTID not found in environment\n") client_id = os.environ["CLIENTID"] # Get all the stations (in an area or country) # Make list of station IDs and dictionary of their lat,long,elev # ids = [] tries = 1 station_dict = dict() while tries <= num_tries: tries += 1 # first get all the stations within a polygon or in a country? # 'geometry': 'POLYGON((10 60, 10 59, 11 60, 11 59))' # area around Oslo # 'country': 'NO' # everything in Norway parameters = { "types": "SensorSystem", "fields": "id,geometry,masl,wmoid,stationholders", } if lonrange is not None and latrange is not None: parameters.update( { "geometry": "POLYGON((" + str(lonrange[0]) + " " + str(latrange[0]) + ", " + str(lonrange[0]) + " " + str(latrange[1]) + ", " + str(lonrange[1]) + " " + str(latrange[1]) + ", " + str(lonrange[1]) + " " + str(latrange[0]) + " ))" } ) logging.debug("Request parameters: %s", str(parameters)) req = requests.get( "https://frost.met.no/sources/v0.jsonld", parameters, auth=(client_id, ""), timeout=30, ) logging.debug( "Request https://frost.met.no/sources/v0.jsonld returned %s", req.status_code, ) # extract list of stations (if response was valid) if req.status_code == 200: data = req.json()["data"] ids = [] count_discard = 0 print(data) for data_block in data: my_id = data_block["id"] if "masl" in data_block: elev = data_block["masl"] else: elev = -999 # missing value # filter data for WMO and non WMO keep_this_id = True if "wmoId" in data_block and wmo is not None and wmo == 0: # station is WMO skip keep_this_id = False logging.debug("throwing out this id (is WMO): %s", my_id) elif "wmoId" not in data_block and wmo is not None and wmo == 1: # station is not WMO skip keep_this_id = False logging.debug("throwing out this id (not WMO): %s", my_id) # filter out stations with incomplete data if keep_this_id and "geometry" not in data_block: keep_this_id = False logging.debug("throwing out this id (no geometry): %s", my_id) # filters for station holders if "stationHolders" not in data_block: keep_this_id = False logging.debug( "throwing out this id (no stationHolders): %s", my_id ) # select station providers elif providers is not None: providers = providers.split(",") station_holders = data_block["stationHolders"] if not any(x in station_holders for x in providers): keep_this_id = False logging.debug( "throwing out this id (station holder): %s", str(station_holders), ) # or exclude certain station providers elif xproviders is not None: xproviders = xproviders.split(",") station_holders = data_block["stationHolders"] if any(x in station_holders for x in xproviders): keep_this_id = False logging.debug( "throwing out this id (exclude station holder): %s", str(station_holders), ) # filter out blacklisted stations if my_id in blacklist: keep_this_id = False logging.debug("throwing out blacklisted id: %s", my_id) if stations is not None: if my_id not in stations: keep_this_id = False logging.debug( "Throwing out station because not in station list %s", my_id, ) logging.debug( "Keep this ID: %s bool: %s", str(my_id), str(keep_this_id) ) if keep_this_id: # write into dict ids.append(my_id) # create a dictionary for these stations to store lat,long,elev for each station_dict[my_id] = [ data_block["geometry"]["coordinates"][1], data_block["geometry"]["coordinates"][0], elev, ] else: count_discard = count_discard + 1 logging.debug("Number of stations: %s", str(len(ids))) logging.debug( "Number of stations , debug=debugdiscarded: %s", str(count_discard) ) break if req.status_code == 404: print("STATUS: No data was found for the list of query Ids.") break if tries > num_tries: raise Exception("ERROR: could not retrieve observations.") # # Use the station ID list to get the observation for each station # ids_obs_dict = {} # declare outside loop, since may be more than one request # check how long the list of stations is and potentially break it up to shorten observations = [] it_ids = len(ids) dt = as_timedelta(seconds=dt) while it_ids > 0: if it_ids > 50: # get last 50 sub_id_list = ids[it_ids - 50 : it_ids] it_ids = it_ids - 50 else: # get the rest if <50 sub_id_list = ids[:it_ids] it_ids = 0 tries = 1 while tries <= num_tries: tries += 1 # use the list of stations and get the observations for those parameters2 = {"sources": ",".join(sub_id_list), "elements": varname} date = validtime.strftime("%Y%m%d") hour = validtime.strftime("%H") # if have specified a date and time if date is not None and hour is not None: # make these into a format that works for FROST date_string = date hour_string = hour date_string_frost = ( date_string[0:4] + "-" + date_string[4:6] + "-" + date_string[6:8] + "T" + hour_string ) parameters2["referencetime"] = date_string_frost # do not have date and time, so use latest else: parameters2["referencetime"] = "latest" parameters2["maxage"] = "PT30M" parameters2["limit"] = 1 logging.debug("Request parameters2: %s", str(parameters2)) req = requests.get( "https://frost.met.no/observations/v0.jsonld", parameters2, auth=(client_id, ""), timeout=30, ) logging.debug( "Request https://frost.met.no/observations/v0.jsonld returned %s", req.status_code, ) if req.status_code == 200: data = req.json()["data"] for data_block in data: # Check that reference time is ok, since sometimes future observations # can be present when 'latest' is chosen for reference time ref_str = data_block["referenceTime"] ref_year = int(ref_str[0:4]) ref_month = int(ref_str[5:7]) ref_day = int(ref_str[8:10]) ref_hour = int(ref_str[11:13]) ref_min = int(ref_str[14:16]) ref_sec = int(ref_str[17:19]) ref_time = as_datetime_args( year=ref_year, month=ref_month, day=ref_day, hour=ref_hour, minute=ref_min, second=ref_sec, ) logging.debug("ref_time %s validtime %s", ref_time, validtime) read_unit = None levels_ok = True if "observations" in data_block: for obs in data_block["observations"]: logging.debug("%s", obs) if "unit" in obs: read_unit = obs["unit"] if "level" in obs: if level is not None: logging.debug("level %s", obs["level"]) all_found = True for key in level: if key in obs["level"]: if str(level[key]) != str( obs["level"][key] ): logging.debug( "%s != %s", level[key], obs["level"][key], ) all_found = False else: logging.debug( "%s == %s", level[key], obs["level"][key], ) if not all_found: levels_ok = False keep_this_obs = False if levels_ok: if abs(ref_time - validtime) < dt: keep_this_obs = True if keep_this_obs: logging.debug("Keep this obs") value = data_block["observations"][0]["value"] if len(str(value)) > 0: # not all stations have observations source_id = str(data_block["sourceId"]) my_id = source_id.split(":") if unit is not None: if read_unit is not None: if cfunits is None: raise Exception("cfunits not loaded!") read_unit = cfunits.Units(read_unit) unit = cfunits.Units(unit) value = cfunits.Units.conform( value, read_unit, unit ) else: raise Exception("Did not read a unit to convert!") ids_obs_dict[my_id[0]] = value logging.debug( "Station list length: %s, total number of observations " "retrieved: %s", str(len(sub_id_list)), str(len(ids_obs_dict)), ) break if req.status_code == 404: print("STATUS: No data was found for the list of query Ids.") break if tries > num_tries: raise Exception("ERROR: could not retrieve observations.") for station, station_id in ids_obs_dict.items(): value = float(station_id) id_info = station_dict[station] stid = str(station)[2:] lat = id_info[0] lon = id_info[1] elev = id_info[2] observations.append( Observation( validtime, lon, lat, value, stid=str(stid), elev=elev, varname=varname, ) ) ObservationSet.__init__(self, observations, label=label)
[docs]class JsonObservationSet(ObservationSet): """JSON observation set."""
[docs] def __init__(self, filename, label="json", var=None, sigmao=None): """Construct an observation data set from a json file. Args: filename (str): Filename label (str, optional): Label of set. Defaults to "json". var (str, optional): Variable name. Defaults to None. sigmao (float, optional): Observation error relative to normal background error. Defaults to None. Raises: RuntimeError: Varname is not found """ with open(filename, mode="r", encoding="utf-8") as file_handler: obs = json.load(file_handler) observations = [] for i in range(0, len(obs)): ind = str(i) obstime = as_datetime(obs[ind]["obstime"]) lon = obs[ind]["lon"] lat = obs[ind]["lat"] elev = obs[ind]["elev"] value = obs[ind]["value"] stid = obs[ind]["stid"] if sigmao is not None: lsigmao = sigmao else: try: lsigmao = obs[ind]["sigmao"] except KeyError: lsigmao = 1.0 varname = "" if "varname" in obs[ind]: varname = obs[ind]["varname"] if varname == "" and var is not None: raise RuntimeError("Varname is not found " + varname) if var is None or var == varname: observations.append( Observation( obstime, lon, lat, value, stid=stid, elev=elev, varname=varname, sigmao=lsigmao, ) ) ObservationSet.__init__(self, observations, label=label, sigmao=sigmao)
[docs]class ObservationFromTitanJsonFile(ObservationSet): """Observation set from titan json file."""
[docs] def __init__(self, an_time, filename, label="", sigmao=None): """Constuct obs set from a titan json file. Args: an_time (_type_): _description_ filename (_type_): _description_ label (str, optional): _description_. Defaults to "". sigmao (float, optional): Observation error relative to normal background error. Defaults to None. """ qc_obs = dataset_from_file(an_time, filename) observations = [] for i in range(0, len(qc_obs)): observations.append( Observation( qc_obs.obstimes[i], qc_obs.lons[i], qc_obs.lats[i], qc_obs.elevs[i], qc_obs.values[i], sigmao=qc_obs.epsilons[i], ) ) ObservationSet.__init__(self, observations, label=label, sigmao=sigmao)