"""
Utilities for interacting with dflow and ripcas models
Author:
Matthew A. Turner <maturner01@gmail.com>
Date:
9 May 2016
"""
import copy
import six
from netCDF4 import Dataset
from numpy import (array, concatenate, fromstring, meshgrid,
reshape, flipud, isnan)
from scipy.interpolate import griddata
from pandas import read_table, read_excel, Series
[docs]def ripcas_with_dflow_io(vegetation_map, zone_map, streambed_roughness,
shear_nc_path, ripcas_required_data):
"""
Wrapper for using DFLOW input/output with ripcas. Note instead of
shear_map we have shear_nc_path. Use shear_mesh_to_asc to convert the
shear_mesh that comes from D-FLOW to a shear_map for input to ripcas.
When ripcas finishes its vegetation updates, convert the updated
vegetation map to a Manning n-value map for use by DFLOW. See the
ripcas function below for more details on the model and arguments.
Arguments:
vegetation_map (ESRIAsc): location on disk or ESRIAsc
representation of the vegetation map
zone_map (ESRIAsc): location on disk or ESRIAsc representation
of the zone map.
shear_nc_path (str): location on disk of DFLOW shear output netCDF
ripcas_required_data (str): Excel spreadsheet of data needed for
ripcas run. Encompasses the landscape model for the watershed.
Can have one or two 'Code' columns and must have exactly one
'shear_resis' column and exactly one 'n_val' column
Returns:
(Pol): polygon representation of the map of nvalues
"""
if not isinstance(vegetation_map, ESRIAsc):
raise TypeError('vegetation_map must be an ESRIAsc instance')
if not isinstance(zone_map, ESRIAsc):
raise TypeError('vegetation_map must be an ESRIAsc instance')
shear_map = shear_mesh_to_asc(shear_nc_path, vegetation_map.header_dict())
return Pol.from_ascii(
veg2n(
ripcas(
vegetation_map, zone_map, shear_map, ripcas_required_data
), ripcas_required_data, streambed_roughness
)
)
[docs]def ripcas(vegetation_map, zone_map, shear_map, ripcas_required_data):
"""
Simple version of the CASiMiR model for vegetation succession. Before the
model is run, we check that all the unique values from vegetation_map are
present in the shear_resistance_dict. Otherwise the process will fail
wherever the vegetation map value is not present in the dictionary
on lookup.
Arguments:
vegetation_map (str or ESRIAsc): location on disk or ESRIAsc
representation of the vegetation map
zone_map (str or ESRIAsc): location on disk or ESRIAsc representation
of the zone map.
shear_map (str or ESRIAsc): location on disk or ESRIAsc representation
of the shear stress map
ripcas_required_data (str): Excel spreadsheet of data needed for
ripcas run. Encompasses the landscape model for the watershed.
Can have one or two 'Code' columns and must have exactly one
'shear_resis' column and exactly one 'n_val' column
Returns:
(ESRIAsc) vegetation map updated with new values corresponding to
succession rules
"""
if isinstance(vegetation_map, six.string_types):
vegetation_map = ESRIAsc(vegetation_map)
elif not isinstance(vegetation_map, ESRIAsc):
raise TypeError('vegetation_map must be type str or ESRIAsc')
if isinstance(shear_map, six.string_types):
shear_map = ESRIAsc(shear_map)
elif not isinstance(shear_map, ESRIAsc):
raise TypeError('shear_map must be type str or ESRIAsc')
if isinstance(zone_map, six.string_types):
zone_map = ESRIAsc(zone_map)
elif not isinstance(zone_map, ESRIAsc):
raise TypeError('zone_map must be type str of ESRIAsc, not ' +
str(type(zone_map)))
if isinstance(ripcas_required_data, six.string_types):
cas_df = read_excel(ripcas_required_data)
# sanity check to make sure our lookup is correct
assert 'Code.1' in cas_df # TODO generalize later
assert 'shear_resis' in cas_df
shear_resistance_dict = dict(
zip(cas_df['Code.1'], cas_df['shear_resis'])
)
else:
raise TypeError('shear_resistance_dict must be type str')
# init the vegetation map that will be returned
ret_veg_map = copy.deepcopy(vegetation_map)
for idx in range(len(shear_map.data)):
# determine whether or not the vegetation should be reset to age zero
veg_val = int(vegetation_map.data[idx])
if veg_val != 0:
is_not_nodata = (
shear_map.data[idx] != shear_map.NODATA_value and
vegetation_map.data[idx] != vegetation_map.NODATA_value and
vegetation_map.data[idx] > 0
)
veg_needs_reset = (
is_not_nodata and
shear_map.data[idx] > shear_resistance_dict[veg_val]
)
if veg_needs_reset:
# reset vegetation to age zero with veg type appropriate to zone
ret_veg_map.data[idx] = zone_map.data[idx]
# whether or not the vegetation was destroyed, age by one
if is_not_nodata:
ret_veg_map.data[idx] += 1
return ret_veg_map
[docs]def shear_mesh_to_asc(shear_nc_path, header_dict):
"""
Extract flow element values and locations from the dflow output netcdf
and project these onto the grid defined by the corner of the
grid defined by the lower-left corner of the bounding box and the
cell size. The results are saved in ESRI .asc format to asc_out_path.
Arguments:
shear_nc_path (str): location of the dflow netcdf on disk
header_dict (dict): dictionary with six fields as required to build
ESRIAsc; NODATA_value, cellsize, ncols, nrows, yllcorner, xllcorner
Returns:
(ESRIAsc) representation of gridded representation of the mesh shear
stress data output from dflow
"""
dflow_ds = Dataset(shear_nc_path, 'r')
# the mesh locations are the x and y centers of the Flow (Finite) Elements
mesh_x = dflow_ds.variables['FlowElem_xcc'][:]
mesh_y = dflow_ds.variables['FlowElem_ycc'][:]
mesh_shear = dflow_ds.variables['taus'][-1] # take the last timestep
cellsize = header_dict['cellsize']
x = array([header_dict['xllcorner'] + (i*cellsize)
for i in range(header_dict['ncols'])])
y = array([header_dict['yllcorner'] + (i*cellsize)
for i in range(header_dict['nrows'])])
grid_x, grid_y = meshgrid(x, y)
# use linear interp so we don't have to install natgrid
asc_mat = griddata((mesh_x, mesh_y), mesh_shear, (grid_x, grid_y))
# not sure why, but this makes it align with the original vegetation map
asc_mat = flipud(asc_mat)
data = reshape(asc_mat, (header_dict['ncols'] * header_dict['nrows']))
data[isnan(data)] = int(header_dict['NODATA_value'])
return ESRIAsc(data=Series(data), **header_dict)
[docs]def veg2n(veg_map, ripcas_required_data, streambed_roughness):
"""
Creat an ESRIAsc representation of an ESRI .asc file that contains roughness
values substituted for vegetation codes. The translation is found in the
Excel file found at lookup_path.
Arguments:
veg_map (ESRIAsc): path to ESRI .asc file with vegetation codes
ripcas_required_data (str): path to Excel file with vegetation
codes mapped to Manning's roughness n-values. The Excel file must
have four columns with headers
Code shear_resis Code n_val
on the first sheet.
streambed_roughness (float): Manning's roughness value for the
streambed itself, which is represented with zeros in the veg map
Raises:
(ValueError) if there is a vegetation code in the .asc that is not
found in the lookup table
Returns:
(ESRIAsc) ESRI .asc map of Manning's n-values in place of veg codes
"""
assert isinstance(veg_map, ESRIAsc), \
"veg_map must be an instance of ESRIAsc"
cas_df = read_excel(ripcas_required_data)
veg2n_dict = dict(
zip(cas_df['Code.1'], cas_df['n_val'])
)
# add streambed_roughness replacement value
veg2n_dict.update({0: streambed_roughness})
ret = copy.deepcopy(veg_map)
ret.data = veg_map.data.replace(veg2n_dict)
return ret
class ESRIAsc:
def __init__(self, file_path=None, ncols=None, nrows=None,
xllcorner=None, yllcorner=None, cellsize=1,
NODATA_value=-9999, data=None):
self.file_path = file_path
self.ncols = ncols
self.nrows = nrows
self.xllcorner = xllcorner
self.yllcorner = yllcorner
self.cellsize = cellsize
self.NODATA_value = NODATA_value
if data is not None and not isinstance(data, Series):
raise RuntimeError('data value must be type pandas.Series')
self.data = data
# if a file is provided, the file metadata will overwrite any
# user-provided kwargs
if file_path:
def getnextval(f):
return f.readline().strip().split()[1]
f = open(file_path, 'r')
self.ncols = int(getnextval(f))
self.nrows = int(getnextval(f))
self.xllcorner = float(getnextval(f))
self.yllcorner = float(getnextval(f))
self.cellsize = int(getnextval(f))
self.NODATA_value = float(getnextval(f))
# should not be necessary for well-formed ESRI files, but
# seems to be for CASiMiR
data_str = ' '.join([l.strip() for l in f.readlines()])
self.data = Series(fromstring(data_str, dtype=float, sep=' '))
colrow_prod = self.nrows*self.ncols
assert len(self.data) == colrow_prod, \
"length of .asc data does not equal product of ncols * nrows" \
"\nncols: {}, nrows: {}, ncols*nrows: {} len(data): {}".format(
self.ncols, self.nrows, colrow_prod, len(self.data))
def header_dict(self):
return dict(ncols=self.ncols,
nrows=self.nrows,
xllcorner=self.xllcorner,
yllcorner=self.yllcorner,
cellsize=self.cellsize,
NODATA_value=self.NODATA_value)
def as_matrix(self, replace_nodata_val=None):
"""
Convenience method to give 2D numpy.ndarray representation. If
replace_nodata_val is given, replace all NODATA_value entries with
it.
Arguments:
replace_nodata_val (float): value with which to replace
NODATA_value entries
Returns:
(numpy.ndarray) matrix representation of the data in the .asc
"""
ret = copy.copy(reshape(self.data, (self.nrows, self.ncols)))
if replace_nodata_val is not None:
ret[ret == self.NODATA_value] = replace_nodata_val
return ret
def write(self, write_path):
# replace nan with NODATA_value
self.data = self.data.fillna(self.NODATA_value)
with open(write_path, 'w+') as f:
f.write("ncols {}\n".format(self.ncols))
f.write("nrows {}\n".format(self.nrows))
f.write("xllcorner {}\n".format(self.xllcorner))
f.write("yllcorner {}\n".format(self.yllcorner))
f.write("cellsize {}\n".format(self.cellsize))
f.write("NODATA_value {}\n".format(self.NODATA_value))
# prob not most efficient, but CASiMiR requires
# ESRI Ascii w/ newlines
f.write(
'\n'.join(
[
' '.join([str(v) for v in row])
for row in self.as_matrix()
]
)
)
def __eq__(self, other):
if isinstance(other, ESRIAsc):
ret = self.ncols == other.ncols
ret = self.nrows == other.nrows and ret
ret = self.xllcorner == other.xllcorner and ret
ret = self.yllcorner == other.yllcorner and ret
ret = self.cellsize == other.cellsize and ret
ret = self.NODATA_value == other.NODATA_value and ret
ret = all(self.data == other.data) and ret
return ret
return NotImplemented
[docs]class Pol:
"""
Wrapper creating and/or reading the .pol files of n-values for DFLOW
"""
df = None
x = None
y = None
n = None
@classmethod
def from_dflow_file(cls, pol_path=None):
c = cls()
if pol_path is not None:
c.df = read_table(
pol_path, skiprows=2, skipinitialspace=True, sep=' ',
header=None, names=['x', 'y', 'z'], engine='python'
)
c.x = array(c.df.x)
c.y = array(c.df.y)
c.z = array(c.df.z)
return c
@classmethod
[docs] def from_ascii(cls, asc):
"""
Generate a polygon file from ESRIAsc
"""
assert isinstance(asc, ESRIAsc), "asc input must be ESRIAsc"
c = cls()
grid = meshgrid(
array(
[asc.xllcorner + asc.cellsize*i for i in range(asc.ncols)]
),
array(
[asc.yllcorner + asc.cellsize*i for i in range(asc.nrows)]
)
)
c.x = concatenate(grid[0])
c.y = concatenate(grid[1])
c.z = array(asc.data)
return c
@classmethod
def from_river_geometry_file(cls, geom_file):
c = cls()
df = read_table(geom_file, sep='\s+', engine='python')
c.df = df
c.x = array(df.X)
c.y = array(df.Y)
c.z = array(df.Z)
return c
[docs] def write(self, out_path):
"""
write the Pol to file
Returns: None
"""
if self.x is not None and self.y is not None and self.z is not None:
assert len(self.x) == len(self.y) and len(self.y) == len(self.z), \
"Lengths of columns is not equal!"
with open(out_path, 'w') as f:
f.write('L1\n')
# weird, but apparently what DFLOW requires
f.write(' '*4 + str(len(self.x)) + ' '*5 + '3\n')
lines = (
' '*3 + (' '*3).join(["{:1.7e}".format(val) for val in el])
for el in zip(self.x, self.y, self.z)
)
for l in lines:
f.write(l + '\n')
else:
raise Exception("Trying to write an incomplete polygon file")
def __eq__(self, other):
if isinstance(other, Pol):
ret = all(self.x == other.x)
ret = all(ret and self.y == other.y)
ret = all(ret and self.z == other.z)
return ret
return NotImplemented
if __name__ == '__main__':
help_msg = '''
dflow_ripcas v1.0 - Matthew Turner <maturner01@gmail.com>
Usage:
python jemez/dflow_ripcas.py <path-to-shear-nc> <path-to-save-output-vegmap>
Example:
$ python jemez/dflow_ripcas.py ~/local_data/dflow_outputs/jemez_r02_map.nc ~/local_data/ripcas_out/veg-out-1.asc
'''
import sys
try:
shear_nc = sys.argv[1]
vegout_path = sys.argv[2]
if shear_nc == '-h':
print help_msg
sys.exit(0)
except:
print help_msg
sys.exit(1)
vegetation_map = ESRIAsc('data/vegclass_2z.asc')
zone_map = ESRIAsc('data/zonemap_2z.asc')
ripcas_data = 'data/ripcas-data-requirements.xlsx'
vegout = ripcas_with_dflow_io(
vegetation_map, zone_map, shear_nc, ripcas_data
)
vegout.write(vegout_path)