from collections import defaultdict
from typing import Tuple
import tempfile
import timml
import flopy
import numpy as np
from matplotlib.path import Path as MplPath
IntArray = np.ndarray
BoolArray = np.ndarray
def locate_points(points, xgrid, ygrid) -> Tuple[IntArray]:
x, y = np.atleast_2d(points).T
i_x = np.minimum(np.searchsorted(xgrid, x), xgrid.size - 1)
i_y = np.minimum(np.searchsorted(ygrid, y), ygrid.size - 1)
return i_x, i_y
def points_in_circle(points, center, radius) -> BoolArray:
dist_squared = ((points - center) ** 2).sum(axis=1)
return dist_squared < radius**2
def points_in_polygon(points, polygon_xy) -> BoolArray:
return MplPath(polygon_xy).contains_points(points)
def rasterization_indices(xy, xgrid, ygrid) -> Tuple[IntArray]:
"""
Vectorized implementation of Bresenham's line algorithm to burn lines into
a 2D raster.
Parameters
----------
xy: np.ndarray of shape (n_vertex, 2, 2)
xgrid: np.ndarray of shape (n_x)
ygrid: np.ndarray of shape (n_y)
Returns
-------
y_index: np.ndarray of int
x_index: np.ndarray of int
"""
def _bresenhamline_nslope(slope) -> np.ndarray:
scale = np.amax(np.abs(slope), axis=1).reshape(-1, 1)
zeroslope = (scale == 0).all(1)
scale[zeroslope] = 1.0
normalizedslope = slope.astype(float) / scale
normalizedslope[zeroslope] = 0.0
return normalizedslope
def bresenham_lines(start, end) -> IntArray:
max_iter = np.amax(np.amax(np.abs(end - start), axis=1))
_, dim = start.shape
nslope = _bresenhamline_nslope(end - start)
# steps to iterate on
stepseq = np.arange(0, max_iter + 1)
stepmat = np.tile(stepseq, (dim, 1)).T
# some hacks for broadcasting properly
bline = start[:, np.newaxis, :] + nslope[:, np.newaxis, :] * stepmat
# Approximate to nearest int
return np.rint(bline).astype(int).reshape((-1, dim))
i_x, i_y = locate_points(xy, xgrid, ygrid)
start = np.column_stack((i_y[:-1], i_x[:-1]))
end = np.column_stack((i_y[1:], i_x[1:]))
return tuple(bresenham_lines(start=start, end=end).T)
def aquifer_z_data(aquifer) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
index = 1 - aquifer.ilap
top = aquifer.z[index]
bottom = aquifer.z[index + 1 :]
thickness = aquifer.Hlayer[index:]
return top, bottom, thickness
def index3d(i_layer, i_y, i_x):
return (
np.repeat(i_layer, i_x.size),
np.tile(i_y, len(i_layer)),
np.tile(i_x, len(i_layer)),
)
class ModflowConverter:
AQUIFERS = slice(0, None, 2)
LEAKY_LAYERS = slice(1, None, 2)
def __init__(self, timml_model, xmin, ymin, xmax, ymax, cellsize):
"""
Convert a TimML model into a steady-state MODFLOW 6 model.
The solutions of the analytic elements of TimML are infinite in the x-y
plane while MODFLOW uses a discrete finite grid. This grid is defined by:
* xmin
* ymin
* xmax
* ymax
* cellsize
Additionally, when no boundaries are specified, MODFLOW will assume a
no flow boundary on the edges of the grid domain. To get comparable
results, the TimML model should generally be surrounded by e.g.
HeadLineSinks.
This converts the following TimML elements to their MODFLOW equivalent:
* Aquifer and Polygon inhomogeneities:
- z is converted into DIS top and bottom.
- k and c and are converted into NPF k.
- if topboundary="semi", hstar is converted into a GHB stage, and the
first c is converted into a GHB conductance.
- if N is given, it is converted into RCH.
* AreaSink:
- is converted to RCH.
* Well:
- is converted to WEL.
* HeadWell
- if resistance is provided: is converted to GHB.
- else: is converted to CHD.
* HeadLineSink:
- if resistance is provided: is converted to GHB.
- else: is converted to CHD.
* LeakyLineDoublet:
- is burned into the NPF k values, by reducing conductivity.
* LeakyBuildingPit:
- is treated like a PolygonInhom.
- the resistance along the edges are burned into the NPF k values
like for a LeakyLineDoublet.
"""
self.timml_model = timml_model
self.xmin = xmin
self.xmax = xmax
self.ymin = ymin
self.ymax = ymax
self.cellsize = cellsize
self.cell_area = cellsize * cellsize
self.x = np.arange(xmin, xmax, cellsize) + 0.5 * cellsize
self.y = np.arange(ymin, ymax, cellsize) + 0.5 * cellsize
self.yy, self.xx = [
a.ravel() for a in np.meshgrid(self.y, self.x, indexing="ij")
]
self.xy = np.column_stack((self.xx, self.yy))
self.nx = self.x.size
self.ny = self.y.size
# MODFLOW 6 has 3D layers:
self.nlayer = timml_model.aq.naq * 2 - 1
self.shape2d = (self.ny, self.nx)
self.shape = (self.nlayer, self.ny, self.nx)
self._initialize_top_bottom()
self._initialize_k()
self._group_elements()
# This is for aquifer and inhomogeneities top:
self.top_recharge = np.zeros(self.shape)
self.top_cond = np.full(self.shape, np.nan)
self.top_head = np.full(self.shape, np.nan)
# We'll store all other boundaries in these arrays:
self.recharge = np.zeros(self.shape)
self.bound_q = np.zeros(self.shape)
self.bound_cond = np.full(self.shape, np.nan)
self.bound_head = np.full(self.shape, np.nan)
self.constant_head = np.full(self.shape, np.nan)
def _group_elements(self):
self.elements = defaultdict(list)
for element in self.timml_model.elementlist + self.timml_model.aq.inhomlist:
self.elements[type(element).__name__].append(element)
def _initialize_top_bottom(self):
top, bottom, thickness = aquifer_z_data(self.timml_model.aq)
self.top = np.full(self.shape2d, top)
self.bottom = np.broadcast_to(
bottom[:, np.newaxis, np.newaxis], self.shape
).copy()
self.thickness = np.broadcast_to(
thickness[:, np.newaxis, np.newaxis], self.shape
).copy()
def _initialize_k(self):
self.k = np.zeros(self.shape)
self.k[self.AQUIFERS] = self.timml_model.aq.kaq[:, np.newaxis, np.newaxis]
if self.nlayer > 1:
kll = (
self.thickness[self.LEAKY_LAYERS]
/ self.timml_model.aq.c[1:, np.newaxis, np.newaxis]
)
self.k[self.LEAKY_LAYERS, ...] = kll
def _convert_polygoninhomogeneity(self, inhom):
polygon_xy = np.column_stack((inhom.x, inhom.y))
top, bottom, thickness = aquifer_z_data(inhom)
kll = thickness[self.LEAKY_LAYERS] / inhom.c[1:, np.newaxis, np.newaxis]
xy_indices = points_in_polygon(self.xy, polygon_xy).reshape(self.shape2d)
self.top[xy_indices] = top
self.bottom[:, xy_indices] = bottom
self.thickness[:, xy_indices] = thickness
self.k[self.AQUIFERS, xy_indices] = inhom.kaq
if self.nlayer > 1:
self.k[self.LEAKY_LAYERS, xy_indices] = kll
N = getattr(inhom, "N", None)
if N is None:
N = 0
self.top_recharge[0, xy_indices] = N
hstar = inhom.hstar
cond = self.cell_area / inhom.c[0]
if hstar is None:
hstar = np.nan
cond = np.nan
self.top_head[0, xy_indices] = hstar
self.top_cond[0, xy_indices] = cond
return
def convert_polygoninhomogenity(self):
for inhom in self.elements["PolygonInhomMaq"]:
self._convert_polygoninhomogeneity(inhom)
def convert_circareasink(self):
for circ in self.elements["CircAreaSink"]:
xy_indices = points_in_circle(self.xy, (circ.xc, circ.yc), circ.R).reshape(
self.shape2d
)
self.recharge[circ.layers * 2, xy_indices] = circ.N
return
def _burn_resistance(self, xy, layers, res):
i_y, i_x = rasterization_indices(xy, self.x, self.y)
i_layer = layers * 2
i_layer, i_y, i_x = index3d(i_layer, i_y, i_x)
k_grid = self.k[i_layer, i_y, i_x]
effective_k = self.cellsize / (self.cellsize / k_grid + res)
self.k[i_layer, i_y, i_x] = effective_k
return
def _convert_linedoublet(self, doublet):
xy = [(doublet.x1, doublet.y1), (doublet.x2, doublet.y2)]
self._burn_resistance(xy, doublet.layers, doublet.res)
return
def convert_linedoublet(self):
for doublet in self.elements["LeakyLineDoublet"]:
self._convert_linedoublet(doublet)
return
def convert_linedoubletstring(self):
for doubletstring in self.elements["LeakyLineDoubletString"]:
for doublet in doubletstring.ldlist:
self._convert_linedoublet(doublet)
return
def convert_leakybuildingpit(self):
for pit in self.elements["LeakyBuildingPit"]:
self._convert_polygoninhomogeneity(pit)
xy = np.column_stack((pit.x, pit.y))
self._burn_resistance(xy, pit.layers, pit.res)
return
def _convert_linesink(self, sink):
xy = [(sink.x1, sink.y1), (sink.x2, sink.y2)]
i_y, i_x = rasterization_indices(xy, self.x, self.y)
i_layer = sink.layers * 2
i_layer, i_y, i_x = index3d(i_layer, i_y, i_x)
# FUTURE: interpolate head along the length of the linesink
if sink.res > 0:
self.bound_cond[i_layer, i_y, i_x] = sink.wh / sink.res
self.bound_head[i_layer, i_y, i_x] = sink.hls[0]
else:
self.constant_head[i_layer, i_y, i_x] = sink.hls[0]
return
def convert_linesink(self):
for sink in self.elements["HeadLineSink"]:
self._convert_linesink(sink)
return
def convert_linesinkstring(self):
for sinkstring in self.elements["HeadLineSinkString"]:
for sink in sinkstring.lslist:
self._convert_linesink(sink)
return
def convert_well(self):
for well in self.elements["Well"]:
i_x, i_y = locate_points([well.xw, well.yw], self.x, self.y)
i_layer = well.layers * 2
self.bound_q[i_layer, i_y, i_x] = -well.Qw
return
def convert_headwell(self):
for well in self.elements["HeadWell"]:
i_x, i_y = locate_points([well.xw, well.yw], self.x, self.y)
i_layer = well.layers * 2
if well.resfac > 0:
self.bound_head[i_layer, i_y, i_x] = well.hc
self.bound_cond[i_layer, i_y, i_x] = 1.0 / well.resfac
else:
self.constant_head[i_layer, i_y, i_x] = well.hc
return
def convert(self):
self.convert_polygoninhomogenity()
self.convert_leakybuildingpit()
self.convert_linesink()
self.convert_linesinkstring()
self.convert_linedoublet()
self.convert_linedoubletstring()
self.convert_headwell()
self.convert_well()
self.convert_circareasink()
return
def to_mf6(self):
def cell_ids(valid):
return [tuple(i) for i in np.argwhere(valid)]
constant_head = self.constant_head
top_recharge = self.top_recharge
top_head = self.top_head
top_cond = self.top_cond
recharge = self.recharge
bound_q = self.bound_q
bound_head = self.bound_head
bound_cond = self.bound_cond
nlay, nrow, ncol = self.shape
workspace = tempfile.TemporaryDirectory()
print(workspace)
sim = flopy.mf6.MFSimulation(
# sim_name="TimmlConverted", sim_ws=".", exe_name="mf6")
sim_name="TimmlConverted",
sim_ws=workspace.name,
exe_name="mf6",
)
flopy.mf6.ModflowTdis(sim, nper=1)
flopy.mf6.ModflowIms(
sim,
outer_dvclose=1.0e-5,
outer_maximum=3, # Linear problem, should be able to solve in one cg-solve...
inner_maximum=200,
inner_dvclose=1.0e-6,
)
gwf = flopy.mf6.ModflowGwf(sim, modelname="gwf", save_flows=True)
flopy.mf6.ModflowGwfdis(
gwf,
nlay=nlay,
nrow=nrow,
ncol=ncol,
delr=self.cellsize,
delc=self.cellsize,
top=self.top,
botm=self.bottom,
)
flopy.mf6.ModflowGwfnpf(
gwf,
icelltype=0, # confined
k=self.k,
k33=self.k,
)
flopy.mf6.ModflowGwfic(gwf, strt=0.0)
# Setup recharge for the top
valid_rch = top_recharge != 0
if valid_rch.any():
rch_top_stress = {0: [(i, top_recharge[i]) for i in cell_ids(valid_rch)]}
flopy.mf6.ModflowGwfrch(gwf, stress_period_data=rch_top_stress)
# Setup general head boundary for the top
valid_ghb = ~np.isnan(top_cond)
if valid_ghb.any():
ghb_top_stress = {
0: [(i, top_head[i], top_cond[i]) for i in cell_ids(valid_ghb)]
}
flopy.mf6.ModflowGwfghb(gwf, stress_period_data=ghb_top_stress)
# Setup constant head
valid_chd = ~np.isnan(constant_head)
if valid_chd.any():
chd_stress = {0: [(i, constant_head[i]) for i in cell_ids(valid_chd)]}
flopy.mf6.ModflowGwfchd(gwf, stress_period_data=chd_stress)
# Setup recharge
valid_rch = recharge != 0
if valid_rch.any():
rch_stress = {0: [(i, recharge[i]) for i in cell_ids(valid_rch)]}
flopy.mf6.ModflowGwfrch(gwf, stress_period_data=rch_stress)
# Setup general head boundary
valid_ghb = ~np.isnan(bound_cond)
if valid_ghb.any():
ghb_stress = {
0: [(i, bound_head[i], bound_cond[i]) for i in cell_ids(valid_ghb)]
}
flopy.mf6.ModflowGwfghb(gwf, stress_period_data=ghb_stress)
# Setup wells for fixed discharge boundaries
valid_wel = bound_q != 0
if valid_wel.any():
wel_stress = {0: [(tuple(i), bound_q[i]) for i in cell_ids(valid_wel)]}
flopy.mf6.ModflowGwfwel(gwf, stress_period_data=wel_stress)
flopy.mf6.ModflowGwfoc(
gwf,
head_filerecord="gwf.hds",
saverecord=[("HEAD", "ALL")],
)
return sim, gwf
Hans and Jan had run into a case where the leaky line doublet seemed to give different answers compared to MWELL and a checking model MODFLOW model. I tried to make a parametrized / reproducible example in flopy... and I quickly found out that @dbrakenhoff had already reported and fixed this, see #71.
Anyway, the script I'd made has most of the logic for converting a TimML ModelMaq instance to a Flopy Modflow6 model. Here's the implementation of a
ModflowConverter, it doesn't require anything fancy in terms of dependencies, just numpy, flopy, and matplotlib:Details
<\details>