"""Geometry."""
import json
import logging
import math
import os
from abc import ABC, abstractmethod
import numpy as np
import pyproj
try:
from osgeo import ogr # type: ignore
except Exception:
ogr = None
from .namelist_legacy import BaseNamelist
[docs]class Geo(object):
"""Geometry."""
[docs] def __init__(self, lons, lats):
"""Construct geometry.
Args:
lons (np.ndarray): Longitudes
lats (np.ndarray): Latitudes
Raises:
Exception: _description_
"""
can_interpolate = False
if not isinstance(lons, np.ndarray) or not isinstance(lats, np.ndarray):
raise Exception("Longitudes and latitudes must be numpy nd arrays")
ndims = len(lons.shape)
self.ndims = ndims
if len(lons.shape) != len(lats.shape):
raise Exception("Mismatch in lat and lon")
if len(lons.shape) > 1 and len(lats.shape) > 1:
can_interpolate = True
logging.debug("ndims=%s, can_interpolate=%s", ndims, can_interpolate)
nlons = lons.shape[0]
nlats = lats.shape[0]
if ndims > 1:
nlats = lats.shape[1]
if ndims > 1:
npoints = nlons * nlats
else:
npoints = nlats
self.npoints = npoints
self.nlons = nlons
self.nlats = nlats
self.lonrange = [np.min(lons), np.max(lons)]
self.latrange = [np.min(lats), np.max(lats)]
self.lons = lons
self.lonlist = lons.flatten()
self.lats = lats
self.latlist = lats.flatten()
logging.debug("lonlist=%s", self.lonlist)
logging.debug("latlist=%s", self.latlist)
logging.debug("nlons=%s nlats=%s", nlons, nlats)
logging.debug("lons=%s shape=%s", lons, lons.shape)
logging.debug("lats=%s shape=%s", lats, lats.shape)
self.can_interpolate = can_interpolate
[docs] def identifier(self):
"""Create identifier."""
f_lon = ""
l_lon = ""
if self.lonrange is not None:
f_lon = str(round(float(self.lonrange[0]), 2))
l_lon = str(round(float(self.lonrange[-1]), 2))
f_lat = ""
l_lat = ""
if self.latrange is not None:
f_lat = str(round(float(self.latrange[0]), 2))
l_lat = str(round(float(self.latrange[-1]), 2))
tag = (
":"
+ str(self.npoints)
+ ":"
+ str(self.nlons)
+ ":"
+ str(self.nlats)
+ ":"
+ f_lon
+ ":"
+ l_lon
+ ":"
+ f_lat
+ ":"
+ l_lat
+ ":"
)
tag = tag.replace(" ", "")
logging.debug("TAG: %s", tag)
return tag
[docs] def is_identical(self, geo_to_check):
"""Check if geometries are identical.
Args:
geo_to_check (surfex.Geo): Geo to check.
Returns:
bool: True if identical.
"""
if self.identifier() == geo_to_check.identifier():
logging.debug("Geometries are identical")
return True
else:
return False
[docs] def write_proj_info(self):
"""Write proj info.
Returns:
None: Do nothing for now.
"""
return None
[docs]class SurfexGeo(ABC, Geo):
"""Abstract surfex geometry class.
Args:
ABC (_type_): _description_
Geo (_type_): _description_
"""
[docs] def __init__(self, proj, lons, lats):
"""Construct a surfex geometry.
Args:
proj (str): Proj string
lons (np.ndarray): Lons
lats (np.ndarray): Lats
"""
self.mask = None
self.proj = proj
Geo.__init__(self, lons, lats)
[docs] @abstractmethod
def update_namelist(self, nml):
"""Update namelist.
Args:
nml (_type_): _description_
Returns:
_type_: _description_
"""
raise NotImplementedError
[docs] @abstractmethod
def subset(self, geo):
"""Find subset of geo.
Args:
geo (surfex.Geo): Geometry to check.
"""
raise NotImplementedError
[docs]class ConfProj(SurfexGeo):
"""Conf proj."""
[docs] def __init__(self, from_json):
"""Construct conf proj geo.
Args:
from_json (dict): Domain definition
Raises:
KeyError: Missing keys1
KeyError: Missing keys2
KeyError: Missing keys3
KeyError: Missing keys4
"""
self.cgrid = "CONF PROJ"
self.json = from_json
domain_dict = BaseNamelist.lower_case_namelist_dict(from_json)
logging.debug("from_json: %s", from_json)
self.ilone = None
self.ilate = None
self.xtrunc = None
if "nam_conf_proj_grid" in domain_dict:
if (
"nimax"
and "njmax"
and "xloncen"
and "xlatcen"
and "xdx"
and "xdy" in domain_dict["nam_conf_proj_grid"]
):
self.nimax = domain_dict["nam_conf_proj_grid"]["nimax"]
self.njmax = domain_dict["nam_conf_proj_grid"]["njmax"]
self.xloncen = domain_dict["nam_conf_proj_grid"]["xloncen"]
self.xlatcen = domain_dict["nam_conf_proj_grid"]["xlatcen"]
self.xdx = domain_dict["nam_conf_proj_grid"]["xdx"]
self.xdy = domain_dict["nam_conf_proj_grid"]["xdy"]
if "ilone" in domain_dict["nam_conf_proj_grid"]:
self.ilone = domain_dict["nam_conf_proj_grid"]["ilone"]
if "ilate" in domain_dict["nam_conf_proj_grid"]:
self.ilate = domain_dict["nam_conf_proj_grid"]["ilate"]
if "xtrunc" in domain_dict["nam_conf_proj_grid"]:
self.xtrunc = domain_dict["nam_conf_proj_grid"]["xtrunc"]
else:
raise KeyError("Missing keys1")
else:
raise KeyError("Missing key2")
if "nam_conf_proj" in domain_dict:
if "xlon0" and "xlat0" in domain_dict["nam_conf_proj"]:
self.xlon0 = domain_dict["nam_conf_proj"]["xlon0"]
self.xlat0 = domain_dict["nam_conf_proj"]["xlat0"]
else:
raise KeyError("Missing keys3")
else:
raise KeyError("Missing key4")
earth = 6.37122e6
# Work-around for SP cases where projection info in FA header is wrong
self.xlat0 = float("{:.5f}".format(self.xlat0))
self.xlon0 = float("{:.5f}".format(self.xlon0))
if self.xlat0 == 90.0 or self.xlat0 == -90.0:
proj_string = (
f"+proj=stere +lat_0={str(self.xlat0)} +lon_0={str(self.xlon0)} "
f"+lat_ts={str(self.xlat0)}"
)
else:
proj_string = (
f"+proj=lcc +lat_0={str(self.xlat0)} +lon_0={str(self.xlon0)} "
f"+lat_1={str(self.xlat0)} +lat_2={str(self.xlat0)} "
f"+units=m +no_defs +R={str(earth)}"
)
logging.debug("Proj string: %s", proj_string)
proj = pyproj.CRS.from_string(proj_string)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
xloncen, xlatcen = pyproj.Transformer.from_crs(
wgs84, proj, always_xy=True
).transform(self.xloncen, self.xlatcen)
x_0 = float(xloncen) - (0.5 * ((float(self.nimax) - 1.0) * self.xdx))
y_0 = float(xlatcen) - (0.5 * ((float(self.njmax) - 1.0) * self.xdy))
self.x_0 = x_0
self.y_0 = y_0
xxx = np.empty([self.nimax])
yyy = np.empty([self.njmax])
# TODO vectorize
for i in range(0, self.nimax):
xxx[i] = x_0 + (float(i) * self.xdx)
for j in range(0, self.njmax):
yyy[j] = y_0 + (float(j) * self.xdy)
self.xxx = xxx
self.yyy = yyy
y_v, x_v = np.meshgrid(yyy, xxx)
logging.debug("x_v.shape=%s y_v.shape=%s", x_v.shape, y_v.shape)
lons, lats = pyproj.Transformer.from_crs(proj, wgs84, always_xy=True).transform(
x_v, y_v
)
logging.debug("lons.shape=%s lats.shape=%s", lons.shape, lats.shape)
logging.debug("lons.shape=%s", lons)
logging.debug("lats.shape=%s", lats)
SurfexGeo.__init__(self, proj, lons, lats)
[docs] def update_namelist(self, nml):
"""Update namelist.
Args:
nml (f90nml.Namelist): Namelist object.
Returns:
nml (f90nml.Namelist): Namelist object.
"""
nml.update(
{
"nam_pgd_grid": {"cgrid": self.cgrid},
"nam_conf_proj": {
"xlon0": self.xlon0,
"xlat0": self.xlat0,
"xrpk": math.sin(math.radians(self.xlat0)),
"xbeta": 0,
},
"nam_conf_proj_grid": {
"xlatcen": self.xlatcen,
"xloncen": self.xloncen,
"nimax": self.nimax,
"njmax": self.njmax,
"xdx": self.xdx,
"xdy": self.xdy,
},
}
)
if self.ilone is not None:
nml["nam_conf_proj_grid"].update({"ilone": self.ilone})
if self.ilate is not None:
nml["nam_conf_proj_grid"].update({"ilate": self.ilate})
if self.xtrunc is not None:
nml["nam_conf_proj_grid"].update({"xtrunc": self.xtrunc})
return nml
[docs] def subset(self, geo):
"""Find subset of geo.
Args:
geo (surfex.Geo): Geometry to check.
Returns:
(list, list): lons, lats
"""
lons = []
lats = []
if hasattr(geo, "cgrid") and geo.cgrid == self.cgrid:
is_subset = True
if self.xlon0 != geo.xlon0 or self.xlat0 != geo.xlat0:
is_subset = False
if self.xdx != geo.xdx:
is_subset = False
if self.xdy != geo.xdy:
is_subset = False
if self.nimax > geo.nimax:
is_subset = False
if self.njmax > geo.njmax:
is_subset = False
if is_subset:
logging.info("Grids have same projection and grid spacing")
x_0 = None
y_0 = None
for i in range(0, geo.nimax):
if round(self.x_0, 4) == round(geo.xxx[i], 4):
x_0 = i
break
for j in range(0, geo.njmax):
if round(self.y_0, 4) == round(geo.yyy[j], 4):
y_0 = j
break
if x_0 is not None and y_0 is not None:
logging.info(
"Grid is a subset of input grid %s %s", str(x_0), str(y_0)
)
lons = np.arange(x_0, x_0 + self.nimax, 1).tolist()
lats = np.arange(y_0, y_0 + self.njmax, 1).tolist()
return lons, lats
[docs]class LonLatVal(SurfexGeo):
"""LonLatVal."""
[docs] def __init__(self, from_json):
"""Construct a LonLatVal geometry.
Used also for points/observations.
Args:
from_json (dict): Domain description,
Raises:
KeyError: Missing key
KeyError: Missing keys
"""
self.cgrid = "LONLATVAL"
self.json = from_json
domain_dict = BaseNamelist.lower_case_namelist_dict(from_json)
if "nam_lonlatval" in domain_dict:
if "xx" and "xy" and "xdx" and "xdy" in domain_dict["nam_lonlatval"]:
self.x_x = domain_dict["nam_lonlatval"]["xx"]
self.x_y = domain_dict["nam_lonlatval"]["xy"]
self.xdx = domain_dict["nam_lonlatval"]["xdx"]
self.xdy = domain_dict["nam_lonlatval"]["xdy"]
proj4 = "+proj=longlat +datum=WGS84 +no_defs +ellps=WGS84"
proj = pyproj.CRS.from_string(proj4)
SurfexGeo.__init__(self, proj, np.asarray(self.x_x), np.asarray(self.x_y))
self.can_interpolate = False
else:
raise KeyError("Missing keys")
else:
raise KeyError("Missing key")
[docs] def update_namelist(self, nml):
"""Update namelist.
Args:
nml (f90nml.Namelist): Namelist object.
Returns:
nml (f90nml.Namelist): Namelist object.
"""
nml.update(
{
"nam_pgd_grid": {"cgrid": self.cgrid},
"nam_lonlatval": {
"xx": self.x_x,
"xy": self.x_y,
"xdx": self.xdx,
"xdy": self.xdy,
},
}
)
return nml
[docs] def subset(self, geo):
"""Find subset of geo.
Args:
geo (surfex.Geo): Geometry to check.
Returns:
tuple: lons, lats
"""
logging.info("Subset not implemented")
lons = []
lats = []
return lons, lats
[docs]class Cartesian(SurfexGeo):
"""Cartesian."""
[docs] def __init__(self, from_json):
"""Construct Cartesian geometry.
Args:
from_json (_type_): _description_
Raises:
KeyError: Missing key
KeyError: Missing keys
"""
self.cgrid = "CARTESIAN"
self.json = from_json
domain_dict = BaseNamelist.lower_case_namelist_dict(from_json)
if "nam_cartesian" in domain_dict:
if (
"xlat0"
and "xlon0"
and "nimax"
and "njmax"
and "xdx"
and "xdy" in domain_dict["nam_cartesian"]
):
self.xlat0 = domain_dict["nam_cartesian"]["xlat0"]
self.xlon0 = domain_dict["nam_cartesian"]["xlon0"]
self.nimax = domain_dict["nam_cartesian"]["nimax"]
self.njmax = domain_dict["nam_cartesian"]["njmax"]
self.xdx = domain_dict["nam_cartesian"]["xdx"]
self.xdy = domain_dict["nam_cartesian"]["xdy"]
proj = None
lons = []
lats = []
for i in range(0, self.nimax):
lons.append(self.xlon0 + i * self.xdx)
for j in range(0, self.njmax):
lats.append(self.xlat0 + j * self.xdy)
SurfexGeo.__init__(self, proj, np.asarray(lons), np.asarray(lats))
else:
raise KeyError("Missing keys")
else:
raise KeyError("Missing key")
[docs] def update_namelist(self, nml):
"""Update namelist.
Args:
nml (f90nml.Namelist): Namelist object.
Returns:
nml (f90nml.Namelist): Namelist object.
"""
print(nml)
nml.update(
{
"nam_pgd_grid": {"cgrid": self.cgrid},
"nam_cartesian": {
"xlat0": self.xlat0,
"xlon0": self.xlon0,
"nimax": self.nimax,
"njmax": self.njmax,
"xdx": self.xdx,
"xdy": self.xdy,
},
}
)
return nml
[docs] def subset(self, geo):
"""Find subset of geo.
Args:
geo (surfex.Geo): Geometry to check.
Returns:
tuple: lons, lats
"""
logging.info("Subset not implemented")
lons = []
lats = []
return lons, lats
[docs]class LonLatReg(SurfexGeo):
"""LonLatReg."""
[docs] def __init__(self, from_json):
"""Construct the LonLatReg geometry.
Args:
from_json (dict): Domain definition.
Raises:
KeyError: Missing key
KeyError: Missing keys
ZeroDivisionError: nlon and/or nlat is 0
"""
self.cgrid = "LONLAT REG"
self.json = from_json
domain_dict = BaseNamelist.lower_case_namelist_dict(from_json)
if "nam_lonlat_reg" in domain_dict:
if (
"xlonmin"
and "xlonmax"
and "xlatmin"
and "xlatmax"
and "nlon"
and "nlat" in domain_dict["nam_lonlat_reg"]
):
self.xlonmin = domain_dict["nam_lonlat_reg"]["xlonmin"]
self.xlonmax = domain_dict["nam_lonlat_reg"]["xlonmax"]
self.xlatmin = domain_dict["nam_lonlat_reg"]["xlatmin"]
self.xlatmax = domain_dict["nam_lonlat_reg"]["xlatmax"]
self.nlon = domain_dict["nam_lonlat_reg"]["nlon"]
self.nlat = domain_dict["nam_lonlat_reg"]["nlat"]
else:
raise KeyError("Missing keys")
else:
raise KeyError("Missing key")
proj_string = "+proj=longlat +datum=WGS84 +no_defs +ellps=WGS84"
proj = pyproj.CRS.from_string(proj_string)
lons = []
lats = []
if self.nlon == 0 or self.nlat == 0:
raise ZeroDivisionError("nlon and/or nlat is 0")
dlon = (self.xlonmax - self.xlonmin) / (self.nlon - 1)
dlat = (self.xlatmax - self.xlatmin) / (self.nlat - 1)
logging.debug("nlon=%s nlat=%s dlon=%s dlat=%s", self.nlon, self.nlat, dlon, dlat)
for i in range(0, self.nlon):
lons.append(self.xlonmin + i * dlon)
for j in range(0, self.nlat):
lats.append(self.xlatmin + j * dlat)
# proj, npoints, nlons, nlats, lons, lats
latitudes, longitudes = np.meshgrid(lats, lons)
logging.debug("longitudes=%s latitudes=%s", longitudes.shape, latitudes.shape)
SurfexGeo.__init__(self, proj, longitudes, latitudes)
[docs] def update_namelist(self, nml):
"""Update namelist.
Args:
nml (f90nml.Namelist): Namelist object.
Returns:
nml (f90nml.Namelist): Namelist object.
"""
nml.update(
{
"nam_pgd_grid": {"cgrid": self.cgrid},
"nam_lonlat_reg": {
"xlonmin": self.xlonmin,
"xlonmax": self.xlonmax,
"xlatmin": self.xlatmin,
"xlatmax": self.xlatmax,
"nlon": self.nlon,
"nlat": self.nlat,
},
}
)
return nml
[docs] def subset(self, geo):
"""Find subset of geo.
Args:
geo (surfex.Geo): Geometry to check.
Returns:
tuple: lons, lats
"""
logging.info("Subset not implemented")
lons = []
lats = []
return lons, lats
[docs]class IGN(SurfexGeo):
"""IGN."""
[docs] def __init__(self, from_json, recreate=False):
"""Construct a IGN geometry.
Args:
from_json (dict): Domain definition.
recreate (bool, optional): Recreate the cached mask. Defaults to False.
Raises:
NotImplementedError: Projection not implemented
KeyError: Missing key
KeyError: Missing keys
"""
self.cgrid = "IGN"
self.json = from_json
domain_dict = BaseNamelist.lower_case_namelist_dict(from_json)
if "nam_ign" in domain_dict:
if (
"clambert"
and "npoints"
and "xx"
and "xy"
and "xdx"
and "xdy"
and "xx_llcorner"
and "xy_llcorner"
and "xcellsize"
and "ncols"
and "nrows" in domain_dict["nam_ign"]
):
self.clambert = domain_dict["nam_ign"]["clambert"]
npoints = domain_dict["nam_ign"]["npoints"]
self.x_x = domain_dict["nam_ign"]["xx"]
self.x_y = domain_dict["nam_ign"]["xy"]
self.xdx = domain_dict["nam_ign"]["xdx"]
self.xdy = domain_dict["nam_ign"]["xdy"]
self.xx_llcorner = domain_dict["nam_ign"]["xx_llcorner"]
self.xy_llcorner = domain_dict["nam_ign"]["xy_llcorner"]
self.xcellsize = domain_dict["nam_ign"]["xcellsize"]
self.ncols = domain_dict["nam_ign"]["ncols"]
self.nrows = domain_dict["nam_ign"]["nrows"]
else:
raise KeyError("Missing keys")
else:
raise KeyError("Missing key")
if self.clambert == 7:
proj4 = (
"+proj=lcc +lat_0=63.5 +lon_0=15.0 +lat_1=63.5 +lat_2=63.5 "
"+no_defs +R=6.37122e+6"
)
self.xloncen = 17
self.xlatcen = 63.0
self.xlon0 = 15
self.xlat0 = 63.5
else:
raise NotImplementedError
proj = pyproj.CRS.from_string(proj4)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
pxall = self.get_coord(self.x_x, self.xdx, "x", recreate)
pyall = self.get_coord(self.x_y, self.xdy, "y", recreate)
self.mask = self.ign_mask(pxall, pyall, self.x_x, self.x_y, recreate)
# proj, npoints, nlons, nlats, lons, lats
lons = []
lats = []
for i in range(0, npoints):
lon, lat = pyproj.Transformer.from_crs(proj, wgs84, always_xy=True).transform(
self.x_x[i], self.x_y[i]
)
lons.append(lon)
lats.append(lat)
SurfexGeo.__init__(self, proj, np.asarray(lons), np.asarray(lats))
[docs] @staticmethod
def get_coord(pin, pdin, coord, recreate=False):
"""Get the IGN coordinates.
Args:
pin (list): _description_
pdin (list): _description_
coord (list): _description_
recreate (bool, optional): _description_. Defaults to False.
Returns:
list: Output coordinates
"""
pout = []
cache = "/tmp/." + coord + "_cached" # noqa S108
if os.path.isfile(cache) and not recreate:
with open(cache, mode="r", encoding="utf-8") as file_handler:
cached_coord = file_handler.read().splitlines()
for c_coord in cached_coord:
pout.append(float(c_coord))
return pout
zdout = []
ksize = 0
if len(pin) > 0:
zdout.append(float(pdin[0]) / 2.0)
pout.append(pin[0])
ksize = 1
if len(pin) > 1:
ksize = 2
pout.append(pin[0] - pdin[0])
zdout.append(0.0)
if len(pin) > 2:
ksize = 3
pout.append(pin[0] + pdin[0])
zdout.append(0.0)
for i, pinval in enumerate(pin):
for j in range(0, ksize):
if pout[j] == pinval:
break
if j == ksize - 1:
ksize = ksize + 1
pout.append(pinval)
zdout.append(float(pdin[i]) / 2.0)
# Mesh constrains
for j in range(0, ksize):
if pout[j] < pin[i] and (pout[j] + zdout[j]) >= (pin[i] - pdin[i]):
break
if j == ksize - 1:
ksize = ksize + 1
pout.append(pin[i] - pdin[i])
zdout.append(0.0)
for j in range(0, ksize):
if pout[j] > pin[i] and (pout[j] - zdout[j]) <= (pin[i] + pdin[i]):
break
if j == ksize - 1:
ksize = ksize + 1
pout.append(pin[i] + pdin[i])
zdout.append(0.0)
# Sort pout
pout = sorted(pout)
with open(cache, mode="w", encoding="utf-8") as file_handler:
for pout_val in pout:
file_handler.write(str(pout_val) + "\n")
logging.info("Cached coordinates for : %s", coord)
return pout
[docs] @staticmethod
def ign_mask(pxall, pyall, xxx, yyy, recreate):
"""Create the IGN mask.
Args:
pxall (_type_): _description_
pyall (_type_): _description_
xxx (_type_): _description_
yyy (_type_): _description_
recreate (_type_): _description_
Raises:
Exception: _description_
Returns:
_type_: _description_
"""
mask = []
cache = "/tmp/.mask" # noqa S108
if os.path.isfile(cache) and not recreate:
with open(cache, mode="r", encoding="utf-8") as file_handler:
cached_mask = file_handler.read().splitlines()
for cached_mask_ind in cached_mask:
mask.append(int(cached_mask_ind))
if len(mask) != len(xxx) or len(mask) != len(yyy):
raise Exception("Cached mask mismatch! ", len(mask), len(xxx), len(yyy))
return mask
logging.warning("Creating mask. This takes time:")
count = -1
for i, pxall_val in enumerate(pxall):
for pyall_val in pyall:
count = count + 1
for k, xval in enumerate(xxx):
if xval == pxall_val and yyy[k] == pyall_val:
mask.append(count)
break
logging.debug("%s/%s", i, len(pxall))
# Cache mask for later use
with open(cache, mode="w", encoding="utf-8") as file_handler:
for mask_ind in mask:
file_handler.write(str(mask_ind) + "\n")
logging.info("Created mask: %s", mask)
return mask
[docs] def update_namelist(self, nml):
"""Update namelist.
Args:
nml (f90nml.Namelist): Namelist object.
Returns:
nml (f90nml.Namelist): Namelist object.
"""
nml.update(
{
"nam_pgd_grid": {"cgrid": self.cgrid},
"nam_ign": {
"clambert": self.clambert,
"npoints": self.npoints,
"xx": self.x_x,
"xy": self.x_y,
"xdx": self.xdx,
"xdy": self.xdy,
"xx_llcorner": self.xx_llcorner,
"xy_llcorner": self.xy_llcorner,
"xcellsize": self.xcellsize,
"ncols": self.ncols,
"nrows": self.nrows,
},
}
)
return nml
[docs] def subset(self, geo):
"""Find subset of geo.
Args:
geo (surfex.Geo): Geometry to check.
Returns:
tuple: lons, lats
"""
logging.info("Subset not implemented")
lons = []
lats = []
return lons, lats
[docs]def get_geo_object(from_json):
"""Get a surfex geometry object from a dictionary.
Args:
from_json (dict): Domain definition.
Raises:
NotImplementedError: Grid not implemented
KeyError: Missing grid information cgrid
KeyError: nam_pgd_grid not set!
Returns:
surfex.Geo: Surfex geometry.
"""
domain_dict = {}
for key in from_json:
lower_case_dict = {}
for key2 in from_json[key]:
lower_case_dict.update({key2.lower(): from_json[key][key2]})
domain_dict.update({key.lower(): lower_case_dict})
if "nam_pgd_grid" in domain_dict:
if "cgrid" in domain_dict["nam_pgd_grid"]:
cgrid = domain_dict["nam_pgd_grid"]["cgrid"]
if cgrid == "CONF PROJ":
return ConfProj(from_json)
if cgrid == "LONLATVAL":
return LonLatVal(from_json)
if cgrid == "LONLAT REG":
return LonLatReg(from_json)
if cgrid == "IGN":
return IGN(from_json)
if cgrid == "CARTESIAN":
return Cartesian(from_json)
raise NotImplementedError(f"CGRID={cgrid} is not implemented")
raise KeyError("Missing grid information cgrid")
raise KeyError("nam_pgd_grid not set!")
[docs]def set_domain(settings, domain, hm_mode=False):
"""Set domain.
Args:
settings (dict): Domain definitions
domain (str): Domain name
hm_mode (bool, optional): Harmonie definition. Defaults to False.
Raises:
KeyError: Domain not found
ValueError: Settings should be a dict
Returns:
dict: Domain dictionary
"""
if isinstance(settings, dict):
if domain in settings:
if hm_mode:
ezone = 11
if "EZONE" in settings[domain]:
ezone = settings[domain]["EZONE"]
domain_dict = {
"nam_pgd_grid": {"cgrid": "CONF PROJ"},
"nam_conf_proj": {
"xlat0": settings[domain]["LAT0"],
"xlon0": settings[domain]["LON0"],
},
"nam_conf_proj_grid": {
"ilone": ezone,
"ilate": ezone,
"xlatcen": settings[domain]["LATC"],
"xloncen": settings[domain]["LONC"],
"nimax": settings[domain]["NLON"] - ezone,
"njmax": settings[domain]["NLAT"] - ezone,
"xdx": settings[domain]["GSIZE"],
"xdy": settings[domain]["GSIZE"],
},
}
else:
domain_dict = settings[domain]
return domain_dict
logging.error("Domain not found: %s", domain)
raise KeyError("Domain not found: " + domain)
logging.error("Settings should be a dict")
raise ValueError("Settings should be a dict")
[docs]def shape2ign(catchment, infile, output, ref_proj, indent=None):
"""Read a shape file and convert to IGN geo.
Args:
catchment (_type_): _description_
infile (_type_): _description_
output (_type_): _description_
ref_proj (_type_): _description_
indent (_type_, optional): _description_. Defaults to None.
"""
from_json = json.load(open(ref_proj, mode="r", encoding="utf-8"))
geo = get_geo_object(from_json)
earth = 6.37122e6
proj_string = (
f"+proj=lcc +lat_0={str(geo.xlat0)} +lon_0={str(geo.xlon0)} "
f"+lat_1={str(geo.xlat0)} +lat_2={str(geo.xlat0)} "
f"+units=m +no_defs +R={str(earth)}"
)
logging.debug(proj_string)
proj = pyproj.CRS.from_string(proj_string)
wgs84 = pyproj.CRS.from_string("EPSG:4326")
shpfile = ogr.Open(infile)
shape = shpfile.GetLayer(0)
# TODO find index
logging.info("TODO: find %s index", catchment)
feature = shape.GetFeature(2562)
feature_dict = json.loads(feature.ExportToJson())
logging.debug(feature_dict["properties"]["stNavn"])
logging.debug(feature_dict)
lons = []
lats = []
values = []
for point in feature_dict["geometry"]["coordinates"][0]:
lons.append(point[0])
lats.append(point[1])
values.append(point[2])
xxx, yyy = pyproj.Transformer.from_crs(wgs84, proj, always_xy=True).transform(
lons, lats
)
x_1 = min(xxx)
x_2 = max(xxx)
y_1 = min(yyy)
y_2 = max(yyy)
logging.debug("%s %s %s %s", x_1, x_2, y_1, y_2)
ring = ogr.Geometry(ogr.wkbLinearRing)
for point, xval in enumerate(xxx):
ring.AddPoint(xval, yyy[point])
poly = ogr.Geometry(ogr.wkbPolygon)
poly.AddGeometry(ring)
ign_x = []
ign_y = []
delta_x = 1000
delta_y = 1000
n_x = int((x_2 - x_1) / delta_x) + 1
n_y = int((y_2 - y_1) / delta_y) + 1
xdx = []
xdy = []
npoints = 0
for x_p in range(0, n_x):
xxx = x_1 + (x_p * delta_x) - delta_x
for y_p in range(0, n_y):
yyy = y_1 + (y_p * delta_y) - delta_y
point = ogr.Geometry(ogr.wkbPoint)
point.AddPoint(xxx, yyy)
if not poly.Intersection(point).IsEmpty():
ign_x.append(xxx)
xdx.append(delta_x)
ign_y.append(yyy)
xdy.append(delta_y)
npoints = npoints + 1
nam_json = {
"nam_pgd_grid": {"cgrid": "IGN"},
"nam_ign": {
"clambert": 7,
"npoints": npoints,
"xx": ign_x,
"xy": ign_y,
"xdx": xdx,
"xdy": xdy,
"xx_llcorner": 0,
"xy_llcorner": 0,
"xcellsize": "250",
"ncols": 0,
"nrows": 0,
},
}
with open(output, "w", encoding="utf-8") as file_handler:
json.dump(nam_json, file_handler, indent=indent)