"""
Functions to calculate the downstream water surface elevation by minimizing the
difference between flows calculated via the Manning Formula for discharge and
the historical peak flood values.
(https://en.wikipedia.org/wiki/Manning_formula)
(https://en.wikipedia.org/wiki/Volumetric_flow_rate)
Author:
Matthew A. Turner <maturner01@gmail.com>
Date:
19 April 2016
"""
import numpy as np
import os
import shutil
import subprocess
import time
from collections import namedtuple
from scipy.optimize import minimize_scalar
from ripcas_dflow import ESRIAsc, Pol, ripcas, shear_mesh_to_asc, veg2n
[docs]class ModelRun(object):
"""
A single coupled run. First DFLOW then RipCAS. CoupledRunSequence will
encapsulate a series of coupled runs commencing with preparation of the
initial vegetation map for DFLOW. For now, assume that the vegetation map
is provided to the run_dflow method.
"""
def __init__(self): # , vegetation_ascii_path):
# have the boundary conditions been found?
self.bc_converged = False
# has ripcas been run yet?
self.vegetation_ascii = None
self.ripcas_has_run = False
self.ripcas_directory = None
self.dflow_has_run = False
self.dflow_run_directory = None
self.dflow_shear_output = None
self.upstream_bc = BoundaryCondition()
self.downstream_bc = BoundaryCondition()
self.bc_solution_info = BoundarySolutionInfo()
[docs] def calculate_bc(self, target_streamflow,
dbc_geometry_file, streambed_roughness, slope):
"""
Arguments:
target_streamflow (float): historical or other streamflow that
will be used to drive DFLOW model; this calculation recovers
an estimate for the Water Surface elevation (WS) for this given
streamflow.
dbc_geometry_file (str): path to the stream's cross-sectional
geometry xyz file
streambed_roughness (float): Manning's n-value for the streambed
slope (float): slope taken for the reach
Returns:
(BoundaryCondition, BoundaryCondition): tuple of upstream and
downstream BoundaryCondition instances
"""
dbc_geometry = Pol.from_river_geometry_file(dbc_geometry_file)
bc_solver = BoundaryConditionSolver(
target_streamflow, dbc_geometry, streambed_roughness, slope
)
bc_solution = bc_solver.solve()
self.bc_solution_info = bc_solution
self.bc_converged = bc_solution.success
self.downstream_bc.amplitude = bc_solution.ws_elev
self.upstream_bc.amplitude = bc_solution.streamflow
return (self.upstream_bc, self.downstream_bc)
[docs] def run_dflow(self, dflow_run_directory, vegetation_map,
veg_roughness_shearres_lookup, streambed_roughness,
clobber=True, pbs_script_name='dflow_mpi.pbs',
dflow_run_fun=None):
"""
Both input and output dflow files will go into the dflow_run_directory,
but in input/ and output/ subdirectories.
Arguments:
dflow_run_directory (str): directory where DFLOW files should be
put and where the dflow_run_fun will be run from
vegetation_map (str): path to the input vegetation.pol file. This
function assumes this has already been generated in the proper
format b/c this seems like the best separation of
responsibilities.
clobber (bool): whether or not to overwrite dflow_run_directory if
it exists
pbs_script_name (str): name of .pbs script w/o directory
dflow_run_fun (function): argument-free function to run DFLOW.
Ex. `dflow_run_fun=f` where `f` defined by
`def f: subprocess.call(['qsub', 'dflow_mpi.pbs'])`
Returns:
None
"""
if not self.bc_converged:
raise RuntimeError(
'Boundary conditions must be calculated before ' +
'DFLOW can be run'
)
if self.dflow_has_run:
raise RuntimeError(
'DFLOW has already been run for this CoupledRun'
)
if os.path.exists(dflow_run_directory):
if not clobber:
raise RuntimeError(
'DFLOW has already been run for this CoupledRun'
)
shutil.rmtree(dflow_run_directory)
self.dflow_run_directory = dflow_run_directory
os.mkdir(dflow_run_directory)
# write boundary conditions to file
bc_up_path = os.path.join(dflow_run_directory,
'boundriver_up_0001.cmp')
bc_down_path = os.path.join(dflow_run_directory,
'boundriverdown_0001.cmp')
self.upstream_bc.write(bc_up_path)
self.downstream_bc.write(bc_down_path)
self.vegetation_ascii = ESRIAsc(vegetation_map)
veg_path = os.path.join(dflow_run_directory, 'n.pol')
Pol.from_ascii(
veg2n(self.vegetation_ascii,
veg_roughness_shearres_lookup,
streambed_roughness)
).write(veg_path)
oj = os.path.join
pbs_path = oj(dflow_run_directory, pbs_script_name)
mdu_path = oj(dflow_run_directory, 'base.mdu')
net_path = oj(dflow_run_directory, 'base_net.nc')
ext_path = oj(dflow_run_directory, 'base.ext')
brd_path = oj(dflow_run_directory, 'boundriverdown.pli')
bru_path = oj(dflow_run_directory, 'boundriver_up.pli')
self.dflow_shear_output =\
os.path.join(dflow_run_directory,
'DFM_OUTPUT_base',
'base_map.nc')
with open(pbs_path, 'w') as f:
p = _join_data_dir(oj('dflow_inputs', 'dflow_mpi.pbs'))
s = open(p, 'r').read()
f.write(s)
with open(mdu_path, 'w') as f:
p = _join_data_dir(oj('dflow_inputs', 'base.mdu'))
s = open(p, 'r').read()
f.write(s)
with open(net_path, 'w') as f:
p = _join_data_dir(oj('dflow_inputs', 'base_net.nc'))
s = open(p, 'r').read()
f.write(s)
with open(ext_path, 'w') as f:
p = _join_data_dir(oj('dflow_inputs', 'base.ext'))
s = open(p, 'r').read()
f.write(s)
with open(brd_path, 'w') as f:
p = _join_data_dir(oj('dflow_inputs', 'boundriverdown.pli'))
s = open(p, 'r').read()
f.write(s)
with open(bru_path, 'w') as f:
p = _join_data_dir(oj('dflow_inputs', 'boundriver_up.pli'))
s = open(p, 'r').read()
f.write(s)
bkdir = os.getcwd()
os.chdir(dflow_run_directory)
if dflow_run_fun is None:
print '\n*****\nDry Run of DFLOW\n*****\n'
os.chdir(bkdir)
example_shear_path = 'jemez_r02_map.nc'
if os.path.exists(example_shear_path):
os.makedirs(os.path.dirname(self.dflow_shear_output))
shutil.copyfile(example_shear_path, self.dflow_shear_output)
else:
print 'Get you a copy of a DFLOW output, yo! ' +\
'Can\'t run RipCAS without it!'
with open('not_actually_output.nc', 'w') as f:
f.write('A FAKE NETCDF!!!')
self.dflow_has_run = True
else:
# in the case of running a process on CARC, the ret is a Popen inst
ret = dflow_run_fun()
os.chdir(bkdir)
self.dflow_has_run = True
return ret
def run_ripcas(self, zone_map_path, ripcas_required_data_path,
ripcas_directory, shear_asc=None, clobber=True):
if not self.dflow_has_run:
raise RuntimeError(
'DFLOW must run before ripcas can be run'
)
if os.path.exists(ripcas_directory):
if not clobber:
raise RuntimeError(
'DFLOW has already been run for this CoupledRun'
)
shutil.rmtree(ripcas_directory)
self.ripcas_directory = ripcas_directory
os.mkdir(ripcas_directory)
hdr = self.vegetation_ascii.header_dict()
if shear_asc is None:
shear_asc = shear_mesh_to_asc(self.dflow_shear_output, hdr)
else:
assert isinstance(shear_asc, ESRIAsc),\
'shear_asc must be of type ESRIAsc if provided'
shear_asc.write(
os.path.join(self.dflow_run_directory, 'shear_out.asc')
)
output_veg_ascii = ripcas(
self.vegetation_ascii, zone_map_path,
shear_asc, ripcas_required_data_path
)
output_vegetation_path = os.path.join(
ripcas_directory, 'vegetation.asc'
)
output_veg_ascii.write(output_vegetation_path)
self.ripcas_has_run = True
return output_veg_ascii
BoundarySolutionInfo = namedtuple(
'BoundarySolutionInfo', ['ws_elev', 'streamflow', 'error', 'success']
)
BoundarySolutionInfo.__new__.__defaults__ = (None, None, None, None)
class BoundaryConditionSolver:
def __init__(self,
historical_streamflow,
dbc_geometry,
streambed_roughness,
slope):
self.q_hist = historical_streamflow
self.geom = dbc_geometry
self.n = streambed_roughness
self.slope = slope
def solve(self):
def _streamflow_error(ws_elev):
calc =\
_calculate_streamflow(self.geom, self.n, ws_elev, self.slope)
return abs(calc - self.q_hist)
# generate initial guesses with wide-spaced points
result = minimize_scalar(_streamflow_error,
bounds=(self.geom.z.min(), self.geom.z.max()),
method='bounded',
options={'xatol': 1e-6, 'maxiter': 1000})
return BoundarySolutionInfo(
result.x,
_calculate_streamflow(self.geom, self.n, result.x, self.slope),
result.fun,
result.success
)
StreamflowTuple = namedtuple('StreamflowTuple', ['ws_elev', 'streamflow'])
def _calculate_streamflow(dbc_geometry, streambed_roughness,
water_surface_elevation, slope):
# have N points; get N-1 distances and
# N-1 Max/Min over those distances
x = dbc_geometry.x
y = dbc_geometry.y
z = dbc_geometry.z
dx = np.diff(x)
dy = np.diff(y)
xydist = np.sqrt(np.square(dx) + np.square(dy))
# station = np.cumsum(xydist)
zmax_by_segment = np.array(
[max(z[i], z[i+1]) for i in range(len(z)-1)]
)
zmin_by_segment = np.array(
[min(z[i], z[i+1]) for i in range(len(z)-1)]
)
# get N-1 vector taken from S = ('below', 'triangle', 'trap')
# for the three possible positions of the water surface and
# commensurate calculation methods for wetted perimeter
ws_location = np.array(
[
_get_ws_location(water_surface_elevation, _z[0], _z[1])
for _z in zip(zmax_by_segment, zmin_by_segment)
]
)
# calculate the cross-sectional area of the stream
# at the lower bound
area_vec = np.zeros(len(ws_location))
# wetted perimeter
wp_vec = np.zeros(len(ws_location))
ws_elev = water_surface_elevation
for idx, loc in enumerate(ws_location):
if loc == 'triangle':
zmin = zmin_by_segment[idx]
zmax = zmax_by_segment[idx]
xy = xydist[idx]
# calculate area
area_vec[idx] = 0.5 * (ws_elev - zmin) * xy
# calculate wetted perimeter
_da = ((ws_elev - zmin)/(zmax - zmin)) * xy
_db = ws_elev - zmin
wp_vec[idx] = np.sqrt(_da**2.0 + _db**2.0)
elif loc == 'trapezoid':
zmin = zmin_by_segment[idx]
zmax = zmax_by_segment[idx]
xy = xydist[idx]
area_vec[idx] = 0.5 * xy * (2*ws_elev - zmax - zmin)
wp_vec[idx] = np.sqrt(xy**2.0 + (zmax - zmin)**2.0)
area_sum = sum(area_vec)
wp_sum = sum(wp_vec)
n_inv = (1.0/streambed_roughness)
Q = n_inv * area_sum * (pow((area_sum/wp_sum), 2/3.0)) * np.sqrt(slope)
return Q
def _get_ws_location(water_surface_elev, zmax, zmin):
"""
Return one of three values depending on the location of the water surface
relative to the elevations of the discretized cross-section points.
Vectorized below.
Returns:
(str) one of the following: 'below' if above zmax (and automatically
zmin), 'triangle' if between zmax and zmin, and 'trapezoid'
if below zmin. This corresponds to the geometry that will be used
to calculate the wetted perimeter and area of the induced polygon.
"""
if water_surface_elev > zmax:
return 'trapezoid'
elif water_surface_elev <= zmax and water_surface_elev > zmin:
return 'triangle'
else:
return 'below'
class BoundaryCondition:
def __init__(self,
period=0.0, # (minutes)
amplitude=0.0, # (ISO)
phase=0.0): # (deg)
self.period = period
self.amplitude = amplitude
self.phase = phase
def write(self, out_path):
with open(out_path, 'w') as f:
f.write(self.__repr__())
def __repr__(self):
return '\n'.join([
'* COLUMN=3',
'* COLUMN1=Period (min) or Astronomical Componentname',
'* COLUMN2=Amplitude (ISO)',
'* COLUMN3=Phase (deg)',
'{0} {1} {2}'.format(self.period, self.amplitude, self.phase)
])
def mr_log(log_f, msg):
ta = time.asctime
log_f.write('[{0}] '.format(ta()) + msg)
log_f.flush()
os.fsync(log_f.fileno())
[docs]def modelrun_series(data_dir, initial_vegetation_map, vegzone_map,
veg_roughness_shearres_lookup, peak_flows_file,
geometry_file, streambed_roughness,
streambed_floodplain_roughness, streambed_slope,
dflow_run_fun=None, log_f=None, debug=False):
'''
Run a series of flow and succession models with peak flows given in
peak_flows_file.
Arguments:
data_dir (str): write directory for modelrun series. Must exist
initial_vegetation_map (str): location of year zero veg map
vegzone_map (str): vegetation zone map location
veg_roughness_shearres_lookup (str): Excel spreadsheet containing
conversion from vegetation code to roughness value and vegetation
code to shear stress resistance
peak_flow_file (str): location of text file record of peak flows in
cubic meters per second
geometry_file (str): location of channel geometry at the downstream
location for calculating streamflow
streambed_roughness (float): streambed roughness in channel only; used
when converting vegetation map to roughness map
streambed_floodplain_roughness (float): an average roughness of
stream channel and floodplain used in calculation of downstream
boundary condition for DFLOW
streambed_slope (float): rise over run of the channel used in
calculation of downstream boundary condition for DFLOW
dflow_run_fun (function): function delegate for the user to provide a
custom way to run DFLOW. If none is given, defaults to
submitting a PBS job as is done on CARC systems
log_f (str): log file. if none is given, defaults to `data_dir`.log
with dashes replacing slashes
debug (bool): whether or not to run in debug mode. If running in debug
mode, each DFLOW run returns fake data and
each RipCAS run takes cord/data/shear_out.asc as input
returns:
None
'''
if dflow_run_fun is None:
def dflow_fun():
import subprocess
return subprocess.Popen(
'qsub dflow_mpi.pbs', shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if log_f is None:
first_char = data_dir[0]
root_log_f = first_char if first_char != '/' else ''
root_log_f += data_dir[1:].replace('/', '-')
log_f = open(root_log_f + '.log', 'w')
else:
log_f = open(log_f, 'w')
with open(peak_flows_file, 'r') as f:
l0 = f.readline().strip()
assert l0 == 'Peak.Flood', '{} not Peak.Flood'.format(l0)
peak_flows = [float(l.strip()) for l in f.readlines()]
inputs_dir = os.path.join(data_dir, 'inputs')
if os.path.isdir(inputs_dir):
shutil.rmtree(inputs_dir)
os.mkdir(inputs_dir)
shutil.copy(initial_vegetation_map, inputs_dir)
shutil.copy(vegzone_map, inputs_dir)
shutil.copy(veg_roughness_shearres_lookup, inputs_dir)
shutil.copy(peak_flows_file, inputs_dir)
shutil.copy(geometry_file, inputs_dir)
roughness_slope_path = os.path.join(inputs_dir, 'roughness_slope.txt')
with open(roughness_slope_path, 'w') as f:
f.write('roughness\tslope\n')
f.write('%s\t%s\n' % (streambed_roughness, streambed_slope))
for flow_idx, flow in enumerate(peak_flows):
mr = ModelRun()
mr.calculate_bc(
flow, geometry_file,
streambed_floodplain_roughness, streambed_slope
)
mr_log(
log_f, 'Boundary conditions for flow index {0} finished\n'.format(
flow_idx
)
)
dflow_dir = os.path.join(data_dir, 'dflow-' + str(flow_idx))
if flow_idx == 0:
veg_file = initial_vegetation_map
else:
veg_file = os.path.join(
data_dir, 'ripcas-' + str(flow_idx - 1), 'vegetation.asc'
)
if debug:
mr.run_dflow(dflow_dir, veg_file,
veg_roughness_shearres_lookup, streambed_roughness)
job_id = 'debug'
else:
p_ref = mr.run_dflow(dflow_dir, veg_file,
veg_roughness_shearres_lookup,
streambed_roughness,
dflow_run_fun=dflow_fun)
job_id = p_ref.communicate()[0].split('.')[0]
mr_log(log_f, 'Job ID {0} submitted for DFLOW run {1}\n'.format(
job_id, flow_idx
)
)
# check the status of the job by querying qstat; break loop when
# job no longer exists, giving nonzero poll() value
job_not_finished = True
while job_not_finished:
mr_log(
log_f,
'Job ID {0} not yet finished for DFLOW run {1}\n'.format(
job_id, flow_idx
)
)
if debug:
job_not_finished = False
else:
p = subprocess.Popen(
'qstat ' + job_id, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
p.communicate()
poll = p.poll()
job_not_finished = poll == 0
time.sleep(600)
mr_log(
log_f, 'DFLOW run {0} finished, starting RipCAS\n'.format(
flow_idx
)
)
ripcas_dir = os.path.join(data_dir, 'ripcas-' + str(flow_idx))
if debug:
p = _join_data_dir('shear_out.asc')
mr.run_ripcas(vegzone_map, veg_roughness_shearres_lookup,
ripcas_dir, shear_asc=ESRIAsc(p))
mr_log(log_f, 'RipCAS run {0} finished\n'.format(flow_idx))
log_f.close()
if __name__ == '__main__':
import sys
help_msg = '''
modelrun.py
Author: Matthew Turner
Usage:
python ripcas_dflow/modelrun.py data_dir initial_vegetation vegzone_map veg_roughness_shearres_lookup\
peak_flows_file geometry_file streambed_roughness streambed_slope
data_dir: directory to hold each time step of the model run
initial_vegetation: .asc file with initial vegetation map
vegzone_map: .asc file with vegetation zone information
veg_roughness_shearres_lookup: .xlsx file with veg type-to-n and shear resistance-per-veg type information
peak_flows_file: file with a column of peak flood flow in cubic meters per second
geometry_file: xyz file representing geometry of downstream cross section for boundary conditions calculation
streambed_roughness: floating point number for the roughness value of the streambed
streambed_slope: floating point number for the slope of the stream geography
'''
if sys.argv[1] == '-h' or sys.argv[1] == '--help':
print help_msg
sys.exit(0)
if len(sys.argv) != 9:
print help_msg
sys.exit(1)
data_dir = sys.argv[1]
initial_vegetation_map = sys.argv[2]
vegzone_map = sys.argv[3]
veg_roughness_shearres_lookup = sys.argv[4]
peak_flows_file = sys.argv[5]
geometry_file = sys.argv[6]
streambed_roughness = float(sys.argv[7])
streambed_slope = float(sys.argv[8])
modelrun_series(data_dir, initial_vegetation_map, vegzone_map,
veg_roughness_shearres_lookup, peak_flows_file, geometry_file,
streambed_roughness, streambed_slope, dflow_run_fun=None,
log_f=None)
def _join_data_dir(f):
'''
Join the filename, f, to the default data directory
'''
data_dir = os.path.join(os.path.dirname(__file__), 'data')
return os.path.join(data_dir, f)