Source code for pysurfex.forcing

"""Forcing."""
import abc
import copy
import json
import logging
import os
import shutil
import time

import netCDF4
import numpy as np
import toml

from .cache import Cache
from .configuration import ConfigurationFromHarmonie
from .datetime_utils import as_datetime, as_timedelta
from .file import ForcingFileNetCDF
from .geo import get_geo_object
from .read import ConstantValue, ConvertedInput, Converter
from .util import deep_update


# TODO: should be abstract?
[docs]class SurfexForcing(object): """Surfex forcing class."""
[docs] def __init__(self): """Construct object."""
[docs]class SurfexNetCDFForcing(SurfexForcing): """NetCDF surfex forcing."""
[docs] def __init__(self, filename, geo): """Construct netcdf forcing.""" SurfexForcing.__init__(self) self.io_object = ForcingFileNetCDF(filename, geo)
[docs]class SurfexOutputForcing(object): """Main output class for SURFEX forcing.""" __metaclass__ = abc.ABCMeta
[docs] def __init__(self, base_time, geo, ntimes, var_objs, time_step_intervall): """Construct output forcing. Args: base_time (_type_): _description_ geo (_type_): _description_ ntimes (_type_): _description_ var_objs (_type_): _description_ time_step_intervall (_type_): _description_ """ self.time_step_intervall = time_step_intervall self.valid_time = None self.base_time = base_time self.geo = geo self.ntimes = ntimes self.time_step = 0 self.time_step_value = 0 self.var_objs = var_objs self._check_sanity()
[docs] def _check_sanity(self): if len(self.var_objs) != self.nparameters: raise Exception( f"Inconsistent number of parameter. {str(len(self.var_objs))} != " f"{str(self.nparameters)}" ) # Check if all parameters are present for var_obj in self.var_objs: self.parameters[var_obj.var_name] = 1 for key, value in self.parameters.items(): if value == 0: raise Exception(f"Required parameter {str(key)} is missing!")
# Time dependent parameter nparameters = 11 parameters = { "TA": 0, "QA": 0, "PS": 0, "DIR_SW": 0, "SCA_SW": 0, "LW": 0, "RAIN": 0, "SNOW": 0, "WIND": 0, "WIND_DIR": 0, "CO2": 0, }
[docs] @abc.abstractmethod def write_forcing(self, var_objs, this_time, cache): """Write forcing.""" raise NotImplementedError("users must define writeForcing to use this base class")
[docs]class NetCDFOutput(SurfexOutputForcing): """Forcing in NetCDF format.""" forcing_file = {} translation = { "TA": "Tair", "QA": "Qair", "PS": "PSurf", "DIR_SW": "DIR_SWdown", "SCA_SW": "SCA_SWdown", "LW": "LWdown", "RAIN": "Rainf", "SNOW": "Snowf", "WIND": "Wind", "WIND_DIR": "Wind_DIR", "CO2": "CO2air", } def __init__( self, base_time, geo, fname, ntimes, var_objs, att_objs, att_time, cache, time_step, fmt="netcdf", diskless_write=False, ): """Construct netcdf forcing. Args: base_time (_type_): _description_ geo (_type_): _description_ fname (_type_): _description_ ntimes (_type_): _description_ var_objs (_type_): _description_ att_objs (_type_): _description_ att_time (_type_): _description_ cache (_type_): _description_ time_step (_type_): _description_ fmt (str, optional): _description_. Defaults to "netcdf". diskless_write: sets diskelss=True and persist=True Raises: NotImplementedError: NotImplementedError """ SurfexOutputForcing.__init__(self, base_time, geo, ntimes, var_objs, time_step) if fmt == "netcdf": self.output_format = "NETCDF3_64BIT" elif fmt == "nc4": self.output_format = "NETCDF4" else: raise NotImplementedError(format) logging.info("Forcing type is netCDF") self.forcing_file = {} if fname is None: fname = "FORCING.nc" self.fname = fname self.tmp_fname = self.fname + ".tmp" self.file_handler = netCDF4.Dataset( self.tmp_fname, "w", format=self.output_format, diskless=diskless_write, persist=diskless_write, ) self._define_forcing(geo, att_objs, att_time, cache)
[docs] def write_forcing(self, var_objs, this_time, cache): """Write forcing. Args: var_objs (_type_): _description_ this_time (_type_): _description_ cache (_type_): _description_ """ # VARS for this_obj in self.var_objs: this_var = this_obj.var_name logging.info("Preparing variable %s", this_obj.var_name) tic = time.time() field = this_obj.read_time_step(this_time, cache) field = field.reshape([self.geo.nlats, self.geo.nlons], order="F").flatten() toc = time.time() logging.info("Preparation took %s seconds", str(toc - tic)) self.forcing_file[self.translation[this_var]][self.time_step, :] = field self.forcing_file["TIME"][self.time_step] = self.time_step_value
[docs] def _define_forcing(self, geo, att_objs, att_time, cache): logging.info("Define netcdf forcing") zs_oro = None zref = None uref = None for this_obj in att_objs: this_var = this_obj.var_name logging.info("Define: %s", this_obj.var_name) if this_var == "ZS": zs_oro = this_obj.read_time_step(att_time, cache) zs_oro = zs_oro.reshape( [self.geo.nlats, self.geo.nlons], order="F" ).flatten() elif this_var == "ZREF": zref = this_obj.read_time_step(att_time, cache) zref = zref.reshape([self.geo.nlats, self.geo.nlons], order="F").flatten() elif this_var == "UREF": uref = this_obj.read_time_step(att_time, cache) uref = uref.reshape([self.geo.nlats, self.geo.nlons], order="F").flatten() # DIMS self.forcing_file["NPOINTS"] = self.file_handler.createDimension( "Number_of_points", geo.npoints ) self.forcing_file["NTIMES"] = self.file_handler.createDimension( "time", self.ntimes ) # DEFINE VARS self.forcing_file["TIME"] = self.file_handler.createVariable( "time", "f4", ("time",) ) self.forcing_file["TIME"].units = ( "hours since " + f"{self.base_time.strftime('%Y-%m-%d %H')}:00:00 0:00" ) self.forcing_file["TSTEP"] = self.file_handler.createVariable( "FRC_TIME_STP", "f4" ) self.forcing_file["TSTEP"].longname = "Forcing_Time_Step" self.forcing_file["TSTEP"][:] = self.time_step_intervall self.forcing_file["LON"] = self.file_handler.createVariable( "LON", "f4", ("Number_of_points",) ) self.forcing_file["LON"].longname = "Longitude" self.forcing_file["LON"][:] = geo.lonlist self.forcing_file["LAT"] = self.file_handler.createVariable( "LAT", "f4", ("Number_of_points",) ) self.forcing_file["LAT"].longname = "Latitude" self.forcing_file["LAT"][:] = geo.latlist self.forcing_file["ZS"] = self.file_handler.createVariable( "ZS", "f4", ("Number_of_points",) ) self.forcing_file["ZS"].longname = "Surface_Orography" self.forcing_file["ZS"][:] = zs_oro self.forcing_file["ZREF"] = self.file_handler.createVariable( "ZREF", "f4", ("Number_of_points",) ) self.forcing_file["ZREF"].longname = "Reference_Height" self.forcing_file["ZREF"].units = "m" self.forcing_file["ZREF"][:] = zref self.forcing_file["UREF"] = self.file_handler.createVariable( "UREF", "f4", ("Number_of_points",) ) self.forcing_file["UREF"].longname = "Reference_Height_for_Wind" self.forcing_file["UREF"].units = "m" self.forcing_file["UREF"][:] = uref # Define time dependent variables for this_obj in self.var_objs: this_var = this_obj.var_name if this_var == "TA": self.forcing_file["Tair"] = self.file_handler.createVariable( "Tair", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["Tair"].longname = "Near_Surface_Air_Temperature" self.forcing_file["Tair"].units = "K" elif this_var == "QA": self.forcing_file["Qair"] = self.file_handler.createVariable( "Qair", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["Qair"].longname = "Near_Surface_Specific_Humidity" self.forcing_file["Qair"].units = "kg/kg" elif this_var == "PS": self.forcing_file["PSurf"] = self.file_handler.createVariable( "PSurf", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["PSurf"].longname = "Surface_Pressure" self.forcing_file["PSurf"].units = "Pa" elif this_var == "DIR_SW": self.forcing_file["DIR_SWdown"] = self.file_handler.createVariable( "DIR_SWdown", "f4", ( "time", "Number_of_points", ), ) self.forcing_file[ "DIR_SWdown" ].longname = "Surface_Incident_Downwelling_Shortwave_Radiation" self.forcing_file["DIR_SWdown"].units = "W/m2" elif this_var == "SCA_SW": self.forcing_file["SCA_SWdown"] = self.file_handler.createVariable( "SCA_SWdown", "f4", ( "time", "Number_of_points", ), ) self.forcing_file[ "SCA_SWdown" ].longname = "Surface_Incident_Diffuse_Shortwave_Radiation" self.forcing_file["SCA_SWdown"].units = "W/m2" elif this_var == "LW": self.forcing_file["LWdown"] = self.file_handler.createVariable( "LWdown", "f4", ( "time", "Number_of_points", ), ) self.forcing_file[ "LWdown" ].longname = "Surface_Incident_Diffuse_Longwave_Radiation" self.forcing_file["LWdown"].units = "W/m2" elif this_var == "RAIN": self.forcing_file["Rainf"] = self.file_handler.createVariable( "Rainf", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["Rainf"].longname = "Rainfall_Rate" self.forcing_file["Rainf"].units = "kg/m2/s" elif this_var == "SNOW": self.forcing_file["Snowf"] = self.file_handler.createVariable( "Snowf", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["Snowf"].longname = "Snowfall_Rate" self.forcing_file["Snowf"].units = "kg/m2/s" elif this_var == "WIND": self.forcing_file["Wind"] = self.file_handler.createVariable( "Wind", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["Wind"].longname = "Wind_Speed" self.forcing_file["Wind"].units = "m/s" elif this_var == "WIND_DIR": self.forcing_file["Wind_DIR"] = self.file_handler.createVariable( "Wind_DIR", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["Wind_DIR"].longname = "Wind_Direction" elif this_var == "CO2": self.forcing_file["CO2air"] = self.file_handler.createVariable( "CO2air", "f4", ( "time", "Number_of_points", ), ) self.forcing_file["CO2air"].longname = "Near_Surface_CO2_Concentration" self.forcing_file["CO2air"].units = "kg/m3" else: raise NotImplementedError( f"This should never happen! {this_var} is not defined!" )
[docs] def finalize(self): """Finalize the forcing. Close the file.""" logging.debug("Close file") self.file_handler.close() shutil.move(self.tmp_fname, self.fname)
[docs]class AsciiOutput(SurfexOutputForcing): """Forcing in ASCII format.""" def __init__( self, base_time, geo, fname, ntimes, var_objs, att_objs, att_time, cache, time_step, ): """Construct ASCII forcing output.""" SurfexOutputForcing.__init__(self, base_time, geo, ntimes, var_objs, time_step) self.output_format = "ascii" logging.info("Forcing type is ASCII") self.forcing_file = {} self.file_handler = {} if fname is None: fname = "Params_config.txt" self.fname = fname self._define_forcing(geo, att_objs, att_time, cache)
[docs] def write_forcing(self, var_objs, this_time, cache): """Write forcing. Args: var_objs (_type_): _description_ this_time (_type_): _description_ cache (_type_): _description_ """ for this_obj in self.var_objs: this_var = this_obj.var_name logging.info("Write var name %s", this_obj.var_name) field = this_obj.read_time_step(this_time, cache) fmt = "%20.8f" cols = 50 write_formatted_array(self.file_handler[this_var], field, cols, fmt)
[docs] def _define_forcing(self, geo, att_objs, att_time, cache): zs_oro = None zref = None uref = None for this_obj in att_objs: this_var = this_obj.var_name if this_var == "ZS": zs_oro = this_obj.read_time_step(att_time, cache) elif this_var == "ZREF": zref = this_obj.read_time_step(att_time, cache) elif this_var == "UREF": uref = this_obj.read_time_step(att_time, cache) second = ( self.base_time - self.base_time.replace(hour=0, minute=0, second=0, microsecond=0) ).total_seconds() fmt = "%15.8f" cols = 50 file_handler = open(self.fname, "w", encoding="utf-8") file_handler.write(str(geo.npoints) + "\n") file_handler.write(str(self.ntimes) + "\n") file_handler.write(str(self.time_step_intervall) + "\n") file_handler.write(self.base_time.strftime("%Y") + "\n") file_handler.write(self.base_time.strftime("%m") + "\n") file_handler.write(self.base_time.strftime("%d") + "\n") file_handler.write(str(second) + "\n") write_formatted_array(file_handler, geo.lons, cols, fmt) write_formatted_array(file_handler, geo.lats, cols, fmt) write_formatted_array(file_handler, zs_oro, cols, fmt) write_formatted_array(file_handler, zref, cols, fmt) write_formatted_array(file_handler, uref, cols, fmt) file_handler.close() for key in self.parameters: nam = key if key == "WIND_DIR": nam = "DIR" self.forcing_file[key] = "Forc_" + nam + ".txt" self.file_handler[key] = open(self.forcing_file[key], "w", encoding="utf-8")
[docs] def finalize(self): """Finalize forcing.""" logging.debug("Close file") for key in self.parameters: self.file_handler[key].close()
[docs]def write_formatted_array(file, array, columns, fileformat): """Write a formatted array.""" astr = np.empty(array.size - array.size % columns, dtype="float64") astr = array[0 : astr.size] astr = astr.reshape((columns, astr.size / columns), order="F") mlw = (len(fileformat % 0)) * (columns + 1) formatter = {"float_kind": lambda x: fileformat % x} astr_end = np.array2string( array[astr.size :], separator="", max_line_width=mlw, formatter=formatter )[1:-1] np.savetxt(file, astr.T, fmt=fileformat, newline="\n", delimiter="") file.write(astr_end + "\n")
[docs]def run_time_loop(options, var_objs, att_objs): """Run time loop.""" tic = time.time() this_time = options["start"] cache = Cache(options["cache_interval"]) time_step = options["timestep"] single = False if "single" in options: single = options["single"] # Find how many time steps we want to write ntimes = 0 while this_time <= options["stop"]: ntimes = ntimes + 1 this_time = this_time + as_timedelta(seconds=options["timestep"]) if single: time_step = 1 if ntimes == 1: ntimes = 2 logging.info("Print single time step twice %s", 0) else: raise Exception("Option single should be used with one time step") # Create output object if ( str.lower(options["output_format"]) == "netcdf" or str.lower(options["output_format"]) == "nc4" ): # Set att_time the same as start att_time = options["start"] output = NetCDFOutput( options["start"], options["geo_out"], options["output_file"], ntimes, var_objs, att_objs, att_time, cache, time_step, fmt=str.lower(options["output_format"]), diskless_write=options["diskless_write"], ) elif str.lower(options["output_format"]) == "ascii": att_time = options["start"] output = AsciiOutput( options["start"], options["geo_out"], options["output_file"], ntimes, var_objs, att_objs, att_time, cache, time_step, ) else: raise NotImplementedError("Invalid output format " + options["output_format"]) # Loop output time steps this_time = options["start"] while this_time <= options["stop"]: # Write for each time step logging.info( "Creating forcing for: %s time_step: %s", this_time.strftime("%Y%m%d%H"), str(output.time_step), ) output.write_forcing(var_objs, this_time, cache) output.time_step = output.time_step + 1 if not single: output.time_step_value = output.time_step this_time = this_time + as_timedelta(seconds=options["timestep"]) if cache is not None: cache.clean_fields(this_time) else: output.time_step_value = 0 if output.time_step > 1: this_time = this_time + as_timedelta(seconds=options["timestep"]) # Finalize forcing output.finalize() toc = time.time() logging.info("Forcing generation took %s seconds", str(toc - tic))
def set_input_object( sfx_var, merged_conf, geo, forcingformat, selected_converter, ref_height, first_base_time, timestep, ): """Set the input parameter for a specific SURFEX forcing variable based on input. Args: sfx_var (str): _description_ merged_conf (_type_): _description_ geo (_type_): _description_ forcingformat (_type_): _description_ selected_converter (_type_): _description_ ref_height (_type_): _description_ first_base_time (_type_): _description_ timestep (_type_): _description_ Returns: _type_: _description_ Raises: KeyError: KeyError """ ######################################### # 1. Gobal configuration from yaml file ######################################### conf = copy.deepcopy(merged_conf) # Now we know the CLA settings for each surfex variable # Set defaults (merged global and user settings) # Theses must be sent to the converter to be used for each variable defs = {} if forcingformat in conf: defs = copy.deepcopy(conf[forcingformat]) defs.update({"timestep": timestep}) # All objects with converters, find converter dict entry conf_dict = {} if forcingformat != "constant": # Non-height dependent variables if ref_height is None: if "converter" in conf[sfx_var][forcingformat]: conf_dict = copy.deepcopy(conf[sfx_var][forcingformat]["converter"]) else: raise KeyError("No converter defined for " + sfx_var) # Variables with height dependency else: if ref_height in conf[sfx_var]: if forcingformat not in conf[sfx_var][ref_height]: msg = ( f"{str(conf[sfx_var])}: " + f"Missing definitions for {sfx_var} and format: {forcingformat}" ) raise KeyError(msg) if conf[sfx_var][ref_height][forcingformat] is None: raise KeyError( f"{str(conf[sfx_var])}: Missing definitions for {sfx_var}" ) if "converter" in conf[sfx_var][ref_height][forcingformat]: conf_dict = copy.deepcopy( conf[sfx_var][ref_height][forcingformat]["converter"] ) else: raise KeyError("No converter defined for " + sfx_var) else: raise KeyError( 'No ref height "' + ref_height + '" defined for ' + sfx_var ) ############################################################## ############################################################## ############################################################## # Create the object to be returned if forcingformat == "constant": if ref_height is None: if "constant" in conf[sfx_var]: const_dict = copy.deepcopy(conf[sfx_var]["constant"]) else: raise KeyError("No constant defined for " + sfx_var) else: if ref_height in conf[sfx_var]: if "constant" in conf[sfx_var][ref_height]: const_dict = copy.deepcopy(conf[sfx_var][ref_height]["constant"]) else: raise KeyError("No constant defined for " + sfx_var) else: raise KeyError( 'No ref height "' + ref_height + '" defined for ' + sfx_var ) obj = ConstantValue(geo, sfx_var, const_dict) else: # Construct the converter converter = Converter( selected_converter, first_base_time, defs, conf_dict, forcingformat ) # Construct the input object obj = ConvertedInput(geo, sfx_var, converter) return obj
[docs]def set_forcing_config(**kwargs): """Set the forcing config.""" file_base = None if "harmonie" in kwargs and kwargs["harmonie"]: config_exp = None if "config_exp_surfex" in kwargs: if kwargs["config_exp_surfex"] is not None: config_exp = kwargs["config_exp_surfex"] if config_exp is None: config_exp = ( f"{os.path.abspath(os.path.dirname(__file__))}/cfg/config_exp_surfex.toml" ) logging.info("Using default config from: %s", config_exp) input_data = toml.load(open(config_exp, "r", encoding="utf-8")) config = ConfigurationFromHarmonie(os.environ, input_data) geo_out = config.geo elif "domain" in kwargs and kwargs["domain"] is not None: geo_out = get_geo_object(json.load(open(kwargs["domain"], "r", encoding="utf-8"))) else: raise Exception("No geometry is set") user_config = {} pattern = None timestep = 3600 cache_interval = 3600 zsoro = "default" zsoro_converter = "none" zval = "default" zval_converter = "none" uval = "default" uval_converter = "none" tair = "default" tair_converter = "none" qair = "default" qair_converter = "none" psurf = "default" psurf_converter = "none" dir_sw = "default" dir_sw_converter = "none" sca_sw = "default" sca_sw_converter = "none" lw_rad = "default" lw_rad_converter = "none" rain = "default" rain_converter = "none" snow = "default" snow_converter = "none" wind = "default" wind_converter = "none" wind_dir = "default" wind_dir_converter = "none" co2 = "default" co2_converter = "none" analysis = False interpolation = None try: dtg_start = kwargs["dtg_start"] dtg_stop = kwargs["dtg_stop"] input_format = kwargs["input_format"] output_format = kwargs["output_format"] outfile = kwargs["of"] diskless_write = kwargs["diskless_write"] zref = kwargs["zref"] uref = kwargs["uref"] config = kwargs["config"] if "interpolation" in kwargs: interpolation = kwargs["interpolation"] if "analysis" in kwargs: analysis = kwargs["analysis"] if "fb" in kwargs: file_base = kwargs["fb"] if "geo_out" in kwargs: geo_out = kwargs["geo_out"] if "user_config" in kwargs: user_config = kwargs["user_config"] if "pattern" in kwargs: pattern = kwargs["pattern"] if "timestep" in kwargs: timestep = kwargs["timestep"] if "cache_interval" in kwargs: cache_interval = kwargs["cache_interval"] if "zsoro" in kwargs: zsoro = kwargs["zsoro"] if "zsoro_converter" in kwargs: zsoro_converter = kwargs["zsoro_converter"] if "zval" in kwargs: zval = kwargs["zval"] if "zval_converter" in kwargs: zval_converter = kwargs["zval_converter"] if "uval_converter" in kwargs: uval_converter = kwargs["uval_converter"] if "uval" in kwargs: uval = kwargs["uval"] if "ta" in kwargs: tair = kwargs["ta"] if "ta_converter" in kwargs: tair_converter = kwargs["ta_converter"] if "qa" in kwargs: qair = kwargs["qa"] if "qa_converter" in kwargs: qair_converter = kwargs["qa_converter"] if "ps" in kwargs: psurf = kwargs["ps"] if "ps_converter" in kwargs: psurf_converter = kwargs["ps_converter"] if "dir_sw" in kwargs: dir_sw = kwargs["dir_sw"] if "dir_sw_converter" in kwargs: dir_sw_converter = kwargs["dir_sw_converter"] if "sca_sw" in kwargs: sca_sw = kwargs["sca_sw"] if "sca_sw_converter" in kwargs: sca_sw_converter = kwargs["sca_sw_converter"] if "lw" in kwargs: lw_rad = kwargs["lw"] if "lw_converter" in kwargs: lw_rad_converter = kwargs["lw_converter"] if "rain" in kwargs: rain = kwargs["rain"] if "rain_converter" in kwargs: rain_converter = kwargs["rain_converter"] if "snow" in kwargs: snow = kwargs["snow"] if "snow_converter" in kwargs: snow_converter = kwargs["snow_converter"] if "wind" in kwargs: wind = kwargs["wind"] if "wind_converter" in kwargs: wind_converter = kwargs["wind_converter"] if "wind_dir" in kwargs: wind_dir = kwargs["wind_dir"] if "wind_dir_converter" in kwargs: wind_dir_converter = kwargs["wind_dir_converter"] if "co2" in kwargs: co2 = kwargs["co2"] if "co2_converter" in kwargs: co2_converter = kwargs["co2_converter"] except ValueError: raise Exception("Needed input is missing") from ValueError # Time information if (int(dtg_start) or int(dtg_stop)) < 1000010100: raise Exception( "Invalid start and stop times! " + str(dtg_start) + " " + str(dtg_stop) ) start = as_datetime(str.strip(str(dtg_start))) stop = as_datetime(str.strip(str(dtg_stop))) if file_base is None: first_base_time = start else: first_base_time = as_datetime(str.strip(str(file_base))) # Merge all settings with user all settings merged_conf = deep_update(config, user_config) # Replace global settings from fileformat = input_format if pattern is not None: merged_conf[fileformat]["filepattern"] = pattern # Interpolation merged_conf[fileformat]["interpolation"] = interpolation # Prefer forecast or analysis if analysis: merged_conf[fileformat]["prefer_forecast"] = False merged_conf[fileformat]["fcint"] = 3600.0 merged_conf[fileformat]["offset"] = 0 if "geo_input" in kwargs: geo_input = kwargs["geo_input"] if geo_input is not None: if os.path.exists(geo_input): geo_input = get_geo_object( json.load(open(geo_input, "r", encoding="utf-8")) ) merged_conf[fileformat]["geo_input"] = geo_input else: logging.info("Input geometry %s does not exist", geo_input) # Set attributes atts = ["ZS", "ZREF", "UREF"] att_objs = [] for att_var in atts: # Override with command line options for a given variable ref_height = None cformat = fileformat if att_var == "ZS": if zsoro != "default": cformat = zsoro selected_converter = zsoro_converter elif att_var == "ZREF": if zval != "default": cformat = zval selected_converter = zval_converter ref_height = zref if ref_height == "screen": cformat = "constant" elif att_var == "UREF": if uval != "default": cformat = uval selected_converter = uval_converter ref_height = uref if ref_height == "screen": cformat = "constant" else: raise NotImplementedError att_objs.append( set_input_object( att_var, merged_conf, geo_out, cformat, selected_converter, ref_height, first_base_time, timestep, ) ) # Set forcing variables (time dependent) variables = [ "TA", "QA", "PS", "DIR_SW", "SCA_SW", "LW", "RAIN", "SNOW", "WIND", "WIND_DIR", "CO2", ] var_objs = [] # Search in config file for parameters to override for sfx_var in variables: ref_height = None cformat = fileformat if sfx_var == "TA": if tair != "default": cformat = tair selected_converter = tair_converter ref_height = zref elif sfx_var == "QA": if qair != "default": cformat = qair selected_converter = qair_converter ref_height = zref elif sfx_var == "PS": if psurf != "default": cformat = psurf selected_converter = psurf_converter elif sfx_var == "DIR_SW": if dir_sw != "default": cformat = dir_sw selected_converter = dir_sw_converter elif sfx_var == "SCA_SW": if sca_sw != "default": cformat = sca_sw selected_converter = sca_sw_converter elif sfx_var == "LW": if lw_rad != "default": cformat = lw_rad selected_converter = lw_rad_converter elif sfx_var == "RAIN": if rain != "default": cformat = rain selected_converter = rain_converter elif sfx_var == "SNOW": if snow != "default": cformat = snow selected_converter = snow_converter elif sfx_var == "WIND": if wind != "default": cformat = wind selected_converter = wind_converter ref_height = uref elif sfx_var == "WIND_DIR": if wind_dir != "default": cformat = wind_dir selected_converter = wind_dir_converter ref_height = uref elif sfx_var == "CO2": if co2 != "default": cformat = co2 selected_converter = co2_converter else: raise NotImplementedError var_objs.append( set_input_object( sfx_var, merged_conf, geo_out, cformat, selected_converter, ref_height, first_base_time, timestep, ) ) # Save options options = dict() options["output_format"] = output_format options["output_file"] = outfile options["diskless_write"] = diskless_write options["start"] = start options["stop"] = stop options["timestep"] = timestep options["geo_out"] = geo_out options["single"] = False if "single" in kwargs: options["single"] = kwargs["single"] options["cache_interval"] = cache_interval return options, var_objs, att_objs
[docs]def modify_forcing(**kwargs): """Modify forcing.""" infile = kwargs["input_file"] outfile = kwargs["output_file"] time_step = kwargs["time_step"] variables = kwargs["variables"] ifile = netCDF4.Dataset(infile, "r") ofile = netCDF4.Dataset(outfile, "r+") for var in variables: print("Modify variable " + var) print( "input", ifile[var][time_step, :], ifile[var][time_step, :].shape, time_step ) print("output", ofile[var][0, :], ofile[var][0, :].shape) ofile[var][0, :] = ifile[var][time_step, :] ofile.sync() ifile.close() ofile.close()