Reference¶
pyposeidon.meteo
¶
Meteo module. Pre-processing the weather forcing component.
Meteo
¶
Read meteo data from various sources.
Source code in pyposeidon/meteo.py
class Meteo:
"""
Read meteo data from various sources.
"""
def __init__(self, meteo_source: Optional[str] = None, **kwargs):
"""
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
meteo_source str: Path or url to meteo data.
lon_min float: Minimum longitude.
lon_max float: Maximum longitude.
lat_min float: Minimum latitude.
lat_max float: Maximum latitude.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
meteo_merge bool : Flag for performing data merging. Defaults to `False`.
meteo_combine_by str: Define option for data merging. Defaults to `by_coords`.
start_date str: The date from which the analysis should start. It should be a string parseable
by `pd.to_datetime()`.
end_date str: The date at which the analysis should end. It should be a string parseable by
`pd.to_datetime()`.
time_frame str: The duration of the analysis. It should be a string parseable by
`pd.to_datetime()`.
meteo_backend_kwargs dict: Args passed to `xarray`. Defaults to `{"indexpath": ""}`.
meteo_xr_kwargs dict: Args passed to `xarray`. Defaults to `{}`.
meteo_irange list: Select the range and step of time coordinate to be used. Defaults to `[0, -1, 1]`.
"""
# integrate geometry attribute.
geometry = kwargs.get("geometry", None)
if geometry:
kwargs.update(**geometry)
if isinstance(meteo_source, pathlib.Path):
meteo_source = meteo_source.as_posix()
meteo_func = dispatch_meteo_source(meteo_source)
self.Dataset = meteo_func(meteo_source=meteo_source, **kwargs)
def to_output(self, solver_name, **kwargs):
solver = tools.get_solver(solver_name)
var_list = kwargs.pop("vars", ["msl", "u10", "v10"])
m_index = get_value(self, kwargs, "m_index", 1)
split_by = get_value(self, kwargs, "meteo_split_by", None)
if split_by:
times, datasets = zip(*self.Dataset.groupby("time.{}".format(split_by)))
mpaths = ["sflux/sflux_air_{}.{:04d}.nc".format(m_index, t + 1) for t in np.arange(len(times))]
for das, mpath in list(zip(datasets, mpaths)):
solver.to_force(das, vars=var_list, filename=mpath, **kwargs)
else:
solver.to_force(self.Dataset, vars=var_list, **kwargs)
__init__(self, meteo_source=None, **kwargs)
special
¶
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
meteo_source |
str |
Path or url to meteo data. |
None |
lon_min |
float |
Minimum longitude. |
required |
lon_max |
float |
Maximum longitude. |
required |
lat_min |
float |
Minimum latitude. |
required |
lat_max |
float |
Maximum latitude. |
required |
geometry |
Union[dict, str, GeoDataFrame] |
A |
required |
meteo_merge |
bool |
Flag for performing data merging. Defaults to |
required |
meteo_combine_by |
str |
Define option for data merging. Defaults to |
required |
start_date |
str |
The date from which the analysis should start. It should be a string parseable
by |
required |
end_date |
str |
The date at which the analysis should end. It should be a string parseable by
|
required |
time_frame |
str |
The duration of the analysis. It should be a string parseable by
|
required |
meteo_backend_kwargs |
dict |
Args passed to |
required |
meteo_xr_kwargs |
dict |
Args passed to |
required |
meteo_irange |
list |
Select the range and step of time coordinate to be used. Defaults to |
required |
Source code in pyposeidon/meteo.py
def __init__(self, meteo_source: Optional[str] = None, **kwargs):
"""
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
meteo_source str: Path or url to meteo data.
lon_min float: Minimum longitude.
lon_max float: Maximum longitude.
lat_min float: Minimum latitude.
lat_max float: Maximum latitude.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
meteo_merge bool : Flag for performing data merging. Defaults to `False`.
meteo_combine_by str: Define option for data merging. Defaults to `by_coords`.
start_date str: The date from which the analysis should start. It should be a string parseable
by `pd.to_datetime()`.
end_date str: The date at which the analysis should end. It should be a string parseable by
`pd.to_datetime()`.
time_frame str: The duration of the analysis. It should be a string parseable by
`pd.to_datetime()`.
meteo_backend_kwargs dict: Args passed to `xarray`. Defaults to `{"indexpath": ""}`.
meteo_xr_kwargs dict: Args passed to `xarray`. Defaults to `{}`.
meteo_irange list: Select the range and step of time coordinate to be used. Defaults to `[0, -1, 1]`.
"""
# integrate geometry attribute.
geometry = kwargs.get("geometry", None)
if geometry:
kwargs.update(**geometry)
if isinstance(meteo_source, pathlib.Path):
meteo_source = meteo_source.as_posix()
meteo_func = dispatch_meteo_source(meteo_source)
self.Dataset = meteo_func(meteo_source=meteo_source, **kwargs)
dispatch_meteo_source(obj)
¶
Choose the appropriate function to handle obj
obj
can be any of the following:
str
Path
list[str]
list[Path]
None
URL
Source code in pyposeidon/meteo.py
def dispatch_meteo_source(obj) -> Callable[..., xr.Dataset]:
"""Choose the appropriate function to handle `obj`
`obj` can be any of the following:
- `str`
- `Path`
- `list[str]`
- `list[Path]`
- `None`
- `URL`
"""
original = obj
obj = tools.cast_path_to_str(obj)
if tools.is_iterable(obj) and not isinstance(obj, str) and not isinstance(obj, xr.Dataset):
if len({type(o) for o in obj}) != 1:
raise ValueError(f"Multiple types in iterable: {obj}")
obj = tools.cast_path_to_str(obj[0])
if obj is None:
meteo_func = empty
elif isinstance(obj, xr.Dataset):
meteo_func = passthrough
elif isinstance(obj, str):
if obj.lower().endswith("grib"):
meteo_func = cfgrib
elif obj.lower().startswith("http"):
meteo_func = from_url
elif obj.lower().endswith("nc"):
meteo_func = netcdf
else:
raise ValueError(f"Can't determine meteo_type from string: {obj}")
else:
raise ValueError(f"Can't determine meteo_type: {type(original)} - {original}")
return meteo_func
pyposeidon.dem
¶
Dem module
Dem
¶
Read bathymetric data from various sources.
Source code in pyposeidon/dem.py
class Dem:
"""
Read bathymetric data from various sources.
"""
def __init__(self, dem_source: str, **kwargs):
"""
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except `dem_source`.
Args:
dem_source str: Path or url to bathymetric data.
lon_min float: Minimum longitude.
lon_max float: Maximum longitude.
lat_min float: Minimum latitude.
lat_max float: Maximum latitude.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines. Defaults to `None`.
adjust_dem bool: Option to match dem with coastlines, Defaults to `True`.
grid_x list[float]: Array of longitude of mesh nodes.
grid_y list[float]: Array of latitude of mesh nodes.
wet_only bool: Flag to mask dry values when interpolating on mesh. Defaults to `False`.
"""
# integrate geometry attribute.
geometry = kwargs.get("geometry", None)
if isinstance(geometry, dict):
kwargs.update(**geometry)
self.Dataset = dem_(source=dem_source, **kwargs)
# print('CHECK2 :','adjusted' in self.Dataset.data_vars.keys())
# print(self.Dataset.data_vars.keys())
if kwargs.get("adjust_dem", True):
coastline = kwargs.get("coastlines", None)
if coastline is None:
logger.warning("coastlines not present, aborting adjusting dem\n")
elif "adjusted" in self.Dataset.data_vars.keys():
logger.info("Dem already adjusted\n")
elif "fval" in self.Dataset.data_vars.keys():
logger.info("Dem already adjusted\n")
else:
self.adjust(coastline)
def adjust(self, coastline, **kwargs):
self.Dataset = fix(self.Dataset, coastline, **kwargs)
try:
check = np.isnan(self.Dataset.adjusted).sum() == 0
except:
check = np.isnan(self.Dataset.fval).sum() == 0
if not check:
logger.warning("Adjusting dem failed, keeping original values\n")
self.Dataset = self.Dataset.drop_vars("adjusted")
__init__(self, dem_source, **kwargs)
special
¶
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
except dem_source
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dem_source |
str |
Path or url to bathymetric data. |
required |
lon_min |
float |
Minimum longitude. |
required |
lon_max |
float |
Maximum longitude. |
required |
lat_min |
float |
Minimum latitude. |
required |
lat_max |
float |
Maximum latitude. |
required |
geometry |
Union[dict, str, GeoDataFrame] |
A |
required |
coastlines |
Union[str, GeoDataFrame] |
A |
required |
adjust_dem |
bool |
Option to match dem with coastlines, Defaults to |
required |
grid_x |
list[float] |
Array of longitude of mesh nodes. |
required |
grid_y |
list[float] |
Array of latitude of mesh nodes. |
required |
wet_only |
bool |
Flag to mask dry values when interpolating on mesh. Defaults to |
required |
Source code in pyposeidon/dem.py
def __init__(self, dem_source: str, **kwargs):
"""
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except `dem_source`.
Args:
dem_source str: Path or url to bathymetric data.
lon_min float: Minimum longitude.
lon_max float: Maximum longitude.
lat_min float: Minimum latitude.
lat_max float: Maximum latitude.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines. Defaults to `None`.
adjust_dem bool: Option to match dem with coastlines, Defaults to `True`.
grid_x list[float]: Array of longitude of mesh nodes.
grid_y list[float]: Array of latitude of mesh nodes.
wet_only bool: Flag to mask dry values when interpolating on mesh. Defaults to `False`.
"""
# integrate geometry attribute.
geometry = kwargs.get("geometry", None)
if isinstance(geometry, dict):
kwargs.update(**geometry)
self.Dataset = dem_(source=dem_source, **kwargs)
# print('CHECK2 :','adjusted' in self.Dataset.data_vars.keys())
# print(self.Dataset.data_vars.keys())
if kwargs.get("adjust_dem", True):
coastline = kwargs.get("coastlines", None)
if coastline is None:
logger.warning("coastlines not present, aborting adjusting dem\n")
elif "adjusted" in self.Dataset.data_vars.keys():
logger.info("Dem already adjusted\n")
elif "fval" in self.Dataset.data_vars.keys():
logger.info("Dem already adjusted\n")
else:
self.adjust(coastline)
normalize_coord_names(dataset)
¶
Return a dataset with coords containing "longitude" and "latitude"
Source code in pyposeidon/dem.py
def normalize_coord_names(dataset: xr.Dataset) -> xr.Dataset:
"""Return a dataset with coords containing "longitude" and "latitude" """
coords = set(dataset.coords.keys())
# longitude
for lon_name in LONGITUDE_NAMES:
if lon_name in coords:
break
else:
raise ValueError(f"Couldn't normalize longitude: {coords}")
# latitude
for lat_name in LATITUDE_NAMES:
if lat_name in coords:
break
else:
raise ValueError(f"Couldn't normalize latitude: {coords}")
dataset = dataset.rename({lon_name: "longitude", lat_name: "latitude"})
return dataset
to_output(dataset, solver_name, **kwargs)
¶
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
except dem_source
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
Dataset |
An |
required |
solver_name |
str |
Name of solver used, e.g. |
required |
Source code in pyposeidon/dem.py
def to_output(dataset, solver_name, **kwargs):
"""
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except `dem_source`.
Args:
dataset Dataset: An `xarray` Dataset.
solver_name str: Name of solver used, e.g. `d3d` or `schism`.
"""
solver_class = tools.get_solver(solver_name=solver_name)
if solver_name == "d3d":
solver_class.to_dep(dataset, **kwargs)
pyposeidon.boundary
¶
Geometry module of pyposeidon. It manages model boundaries.
Boundary
¶
Source code in pyposeidon/boundary.py
class Boundary:
def __init__(self, **kwargs):
"""
Set model boundaries
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except geometry.
Args:
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines. Defaults to `None`.
cbuffer float: The buffer in arcs for extending the coastlines. Defaults to `None`.
levels list[floats]: The range of DEM values for extracting the boundaries.
When one valus is present it defines inner coastlines. When two values exist they define
the extent. Defaults to `None`.
dem_source str: Path or url to bathymetric data.
"""
geometry = kwargs.get("geometry", None)
coastlines = kwargs.get("coastlines", None)
cbuffer = kwargs.get("cbuffer", None)
blevels = kwargs.get("blevels", None)
prad = kwargs.get("R", 1.0)
# COASTLINES
if coastlines is None:
logger.warning("coastlines not given")
self.coasts = None
elif isinstance(coastlines, str):
logger.info("reading {}".format(coastlines))
coasts = gp.GeoDataFrame.from_file(coastlines)
self.coasts = simplify(coasts)
elif isinstance(coastlines, gp.GeoDataFrame):
logger.warning("coastlines is not a file, trying with geopandas Dataset")
try:
self.coasts = simplify(coastlines)
except:
logger.error("coastlines argument not valid ")
sys.exit(1)
# GEOMETRY
if geometry is None:
if levels is None:
logger.error("geometry nor levels is given, exiting")
sys.exit(1)
if isinstance(geometry, dict):
if self.coasts is None:
logger.warning("coastlines might be required")
self.geometry = geometry
elif isinstance(geometry, str):
if geometry == "global":
if self.coasts is None:
logger.warning("coastlines might be required")
self.geometry = "global"
else:
try:
self.geometry = gp.read_file(geometry)
except:
logger.warning("geometry is not a file, trying with geopandas Dataset")
if isinstance(geometry, gp.GeoDataFrame):
self.geometry = geometry
else:
logger.error("geometry argument not valid ")
sys.exit(1)
else:
try:
self.geometry = gp.read_file(geometry)
except:
logger.warning("geometry is not a file, trying with geopandas Dataset")
if isinstance(geometry, gp.GeoDataFrame):
self.geometry = geometry
else:
logger.error("geometry argument not valid ")
sys.exit(1)
# Define internal boundary as isovalue of DEM
if blevels:
dsource = kwargs.get("dem_source", None)
if dsource is None:
logger.error("dem_source is required")
dem = pdem.Dem(geometry=self.geometry, dem_source=dsource)
dem_ = dem.Dataset
self.coasts = get_dem_contours(blevels, dem_)
# get boundaries
if isinstance(self.geometry, dict):
df = tag(self.geometry, self.coasts, cbuffer, blevels)
elif isinstance(self.geometry, str):
if self.coasts is None:
logger.error("coastlines are missing .. exiting\n")
sys.exit(1)
df = global_tag(self.coasts, cbuffer, blevels, R=prad)
elif isinstance(self.geometry, gp.GeoDataFrame):
df = self.geometry
# line tag
df.loc[df.tag == "island", "lindex"] = np.arange(-df[df.tag == "island"].shape[0], 0).tolist() or 0
df.loc[df.tag == "land", "lindex"] = (1000 + np.arange(1, df[df.tag == "land"].shape[0] + 1)).tolist() or 0
df.loc[df.tag == "open", "lindex"] = np.arange(1, df[df.tag == "open"].shape[0] + 1).tolist() or 0
df = df.sort_values("lindex", ascending=False)
df.lindex = df.lindex.astype(int)
# number of points
df["nps"] = df.apply(lambda row: len(row.geometry.xy[1]) - 1, axis=1)
self.contours = df.reset_index(drop=True)
def show(self):
return self.contours.plot(
column="tag",
legend=True,
legend_kwds={
"loc": "upper center",
"bbox_to_anchor": (0.5, 1.15),
"ncol": 3,
"fancybox": True,
"shadow": True,
},
)
__init__(self, **kwargs)
special
¶
Set model boundaries
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
except geometry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
Union[dict, str, GeoDataFrame] |
A |
required |
coastlines |
Union[str, GeoDataFrame] |
A |
required |
cbuffer |
float |
The buffer in arcs for extending the coastlines. Defaults to |
required |
levels |
list[floats] |
The range of DEM values for extracting the boundaries.
When one valus is present it defines inner coastlines. When two values exist they define
the extent. Defaults to |
required |
dem_source |
str |
Path or url to bathymetric data. |
required |
Source code in pyposeidon/boundary.py
def __init__(self, **kwargs):
"""
Set model boundaries
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except geometry.
Args:
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines. Defaults to `None`.
cbuffer float: The buffer in arcs for extending the coastlines. Defaults to `None`.
levels list[floats]: The range of DEM values for extracting the boundaries.
When one valus is present it defines inner coastlines. When two values exist they define
the extent. Defaults to `None`.
dem_source str: Path or url to bathymetric data.
"""
geometry = kwargs.get("geometry", None)
coastlines = kwargs.get("coastlines", None)
cbuffer = kwargs.get("cbuffer", None)
blevels = kwargs.get("blevels", None)
prad = kwargs.get("R", 1.0)
# COASTLINES
if coastlines is None:
logger.warning("coastlines not given")
self.coasts = None
elif isinstance(coastlines, str):
logger.info("reading {}".format(coastlines))
coasts = gp.GeoDataFrame.from_file(coastlines)
self.coasts = simplify(coasts)
elif isinstance(coastlines, gp.GeoDataFrame):
logger.warning("coastlines is not a file, trying with geopandas Dataset")
try:
self.coasts = simplify(coastlines)
except:
logger.error("coastlines argument not valid ")
sys.exit(1)
# GEOMETRY
if geometry is None:
if levels is None:
logger.error("geometry nor levels is given, exiting")
sys.exit(1)
if isinstance(geometry, dict):
if self.coasts is None:
logger.warning("coastlines might be required")
self.geometry = geometry
elif isinstance(geometry, str):
if geometry == "global":
if self.coasts is None:
logger.warning("coastlines might be required")
self.geometry = "global"
else:
try:
self.geometry = gp.read_file(geometry)
except:
logger.warning("geometry is not a file, trying with geopandas Dataset")
if isinstance(geometry, gp.GeoDataFrame):
self.geometry = geometry
else:
logger.error("geometry argument not valid ")
sys.exit(1)
else:
try:
self.geometry = gp.read_file(geometry)
except:
logger.warning("geometry is not a file, trying with geopandas Dataset")
if isinstance(geometry, gp.GeoDataFrame):
self.geometry = geometry
else:
logger.error("geometry argument not valid ")
sys.exit(1)
# Define internal boundary as isovalue of DEM
if blevels:
dsource = kwargs.get("dem_source", None)
if dsource is None:
logger.error("dem_source is required")
dem = pdem.Dem(geometry=self.geometry, dem_source=dsource)
dem_ = dem.Dataset
self.coasts = get_dem_contours(blevels, dem_)
# get boundaries
if isinstance(self.geometry, dict):
df = tag(self.geometry, self.coasts, cbuffer, blevels)
elif isinstance(self.geometry, str):
if self.coasts is None:
logger.error("coastlines are missing .. exiting\n")
sys.exit(1)
df = global_tag(self.coasts, cbuffer, blevels, R=prad)
elif isinstance(self.geometry, gp.GeoDataFrame):
df = self.geometry
# line tag
df.loc[df.tag == "island", "lindex"] = np.arange(-df[df.tag == "island"].shape[0], 0).tolist() or 0
df.loc[df.tag == "land", "lindex"] = (1000 + np.arange(1, df[df.tag == "land"].shape[0] + 1)).tolist() or 0
df.loc[df.tag == "open", "lindex"] = np.arange(1, df[df.tag == "open"].shape[0] + 1).tolist() or 0
df = df.sort_values("lindex", ascending=False)
df.lindex = df.lindex.astype(int)
# number of points
df["nps"] = df.apply(lambda row: len(row.geometry.xy[1]) - 1, axis=1)
self.contours = df.reset_index(drop=True)
pyposeidon.mesh
¶
Mesh module
r2d
¶
Regular 2d grid for d3d
Source code in pyposeidon/mesh.py
class r2d:
"""Regular 2d grid for d3d"""
def __init__(self, **kwargs):
mesh_file = kwargs.get("mesh_file", None)
if mesh_file:
self.Dataset = self.read_file(mesh_file)
else:
geometry = kwargs.get("geometry", None)
try:
lon_min = geometry["lon_min"]
lon_max = geometry["lon_max"]
lat_min = geometry["lat_min"]
lat_max = geometry["lat_max"]
except:
logger.error("geometry not set properly\n")
sys.exit(1)
resolution = kwargs.get("resolution", None)
ni = int(round((lon_max - lon_min) / resolution)) # these are cell numbers
nj = int(round((lat_max - lat_min) / resolution))
lon_max = lon_min + ni * resolution # adjust max lon to much the grid
lat_max = lat_min + nj * resolution
# set the grid
x = np.linspace(lon_min, lon_max, ni)
y = np.linspace(lat_min, lat_max, nj)
gx, gy = np.meshgrid(x, y)
attrs = kwargs.get(
"attrs",
{
"Coordinate System": "Spherical",
"alfori": 0.0,
"xori": 0.0,
"yori": 0.0,
},
)
g = xr.Dataset(
{"lons": (["y", "x"], gx), "lats": (["y", "x"], gy)},
coords={"x": ("x", gx[0, :]), "y": ("y", gy[:, 0])},
)
g.attrs = attrs
self.Dataset = g
@staticmethod
def read_file(filename, **kwargs):
logger.info("read grid file {}".format(filename))
header = pd.read_csv(filename, nrows=3, header=None, comment="*")
cs = header.loc[0, 0].split("=")[1].strip()
ni, nj = header.loc[1, 0].split(" ")
ni, nj = int(ni), int(nj)
alfori, xori, yori = header.loc[2, 0].split(" ")
d = pd.read_csv(
filename,
header=2,
comment="*",
delim_whitespace=True,
engine="python",
na_values="ETA=",
)
d = d.reset_index()
data = d.values[~np.isnan(d.values)]
data = np.array(data)
data = data.reshape(2, nj, ni + 1) # including the row index
# clean up the row index
data = data[:, :, 1:]
lons = data[0, :, :]
lats = data[1, :, :]
g = xr.Dataset(
{"lons": (["y", "x"], lons), "lats": (["y", "x"], lats)},
coords={"x": ("x", lons[0, :]), "y": ("y", lats[:, 0])},
)
g.attrs = {
"Coordinate System": cs,
"alfori": alfori,
"xori": xori,
"yori": yori,
}
return g
def to_file(self, filename, **kwargs):
logger.info("writing grid to file {}".format(filename))
with open(filename, "w") as f:
f.write("Coordinate System= {}\n".format(self.Dataset.attrs["Coordinate System"]))
f.write("{} {}\n".format(self.Dataset.lons.shape[1], self.Dataset.lons.shape[0]))
f.write(
"{} {} {}\n".format(
self.Dataset.attrs["xori"],
self.Dataset.attrs["yori"],
self.Dataset.attrs["alfori"],
)
)
for i in range(self.Dataset.lons.shape[0]):
f.write("ETA= {} ".format(i + 1))
f.write(" ".join(map(str, self.Dataset.lons[i, :].values)))
f.write("\n")
for i in range(self.Dataset.lats.shape[0]):
f.write("ETA= {} ".format(i + 1))
f.write(" ".join(map(str, self.Dataset.lats[i, :].values)))
f.write("\n")
tri2d
¶
Handle an Unstructured triangular 2d mesh.
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
except geometry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mesh_file |
str |
Path to |
required |
mesh_generator |
str |
Use |
required |
geometry |
Union[dict, str, GeoDataFrame] |
A |
required |
coastlines |
Union[str, GeoDataFrame] |
A |
required |
boundary |
GeoDataFrame |
Defined mesh boundaries. Defaults to |
required |
Source code in pyposeidon/mesh.py
class tri2d:
"""
Handle an Unstructured triangular 2d mesh.
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except geometry.
Args:
mesh_file str: Path to `hgrid.gr3`, `jigsaw.msh` or `gmsh.msh` file. Defaults to `None`.
mesh_generator str: Use `jigsaw` or `gmsh`. Defaults to `None`.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window. Defaults to `None`.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines. Defaults to `None`.
boundary GeoDataFrame: Defined mesh boundaries. Defaults to `None`.
"""
def __init__(self, **kwargs):
mesh_file = kwargs.get("mesh_file", None)
mesh_generator = kwargs.get("mesh_generator", None)
geo = kwargs.get("geometry", None)
coasts = kwargs.get("coastlines", None)
boundary = kwargs.get("boundary", None)
if geo == "global":
kwargs.update({"gglobal": True})
if mesh_file:
if mesh_file.endswith(".gr3"):
self.Dataset = self.read_file(mesh_file)
elif mesh_file.endswith(".msh"):
if mesh_generator == "jigsaw":
self.Dataset = mjigsaw.read_msh(mesh_file)
elif mesh_generator == "gmsh":
self.Dataset = mgmsh.read_msh(mesh_file)
else:
raise ValueError("Please define 'mesh_genarator' argument")
elif mesh_generator == "gmsh":
if boundary is None:
self.boundary = Boundary(**kwargs)
else:
self.boundary = boundary
g, bg = mgmsh.get(self.boundary.contours, **kwargs) # create mesh with GMSH
self.Dataset = g
self.bgmesh = bg
elif mesh_generator == "jigsaw":
if boundary is None:
self.boundary = Boundary(**kwargs)
else:
self.boundary = boundary
g, bg = mjigsaw.get(self.boundary.contours, **kwargs) # create mesh with JIGSAW
self.Dataset = g
self.bgmesh = bg
else:
self.Dataset = None
@staticmethod
def read_file(hgrid, **kwargs):
logger.info("read mesh file {}".format(hgrid))
# read file
df = pd.read_csv(hgrid, header=0, low_memory=False)
df = df.dropna(axis=1)
df.columns = ["data"]
# extract number of elements, number of nodes
ni, nj = df.iloc[0].str.split()[0]
ni = int(ni)
nj = int(nj)
# read lon,lat,depth for all nodes
q = pd.DataFrame(df.loc[1:nj, "data"].str.split().values.tolist())
q = q.drop(q.columns[0], axis=1)
q = q.apply(pd.to_numeric)
# q.reset_index(inplace=True, drop=True)
q.columns = ["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "depth"]
q.index.name = "nSCHISM_hgrid_node"
# create xarray of grid
mesh = q.loc[:, ["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y"]].to_xarray()
mesh = mesh.drop_vars("nSCHISM_hgrid_node")
# create xarray of depth
depth = q.loc[:, "depth"].to_xarray()
depth = depth.drop_vars("nSCHISM_hgrid_node")
# read connectivity
e = pd.DataFrame(df.loc[nj + 1 : nj + ni, "data"].str.split().values.tolist())
e = e.drop(e.columns[0], axis=1)
e = e.apply(pd.to_numeric)
ncolumns = e.loc[:, e.columns[0]].max()
if ncolumns == 3:
e.columns = ["nv", "a", "b", "c"]
else:
e.columns = ["nv", "a", "b", "c", "d"]
e.loc[:, e.columns[1:]] = e.loc[:, e.columns[1:]].values - 1 # convert to python (index starts from 0)
# create xarray of tessellation
els = xr.DataArray(
e.loc[:, e.columns[1:]].values,
dims=["nSCHISM_hgrid_face", "nMaxSCHISM_hgrid_face_nodes"],
name="SCHISM_hgrid_face_nodes",
)
# Open boundaries
n0 = df[df.data.str.contains("open boundaries")].index
n0 = n0.values[0]
nob = df.loc[n0, "data"].split("=")[0].strip()
nob = int(nob)
nobn = df.loc[n0 + 1, "data"].split("=")[0].strip()
nobn = int(nobn)
odic = {}
ottr = []
idx = n0 + 2
for nl in range(nob):
nn = df.loc[idx, "data"].split("=")[0].strip()
nn = int(nn)
label = df.loc[idx, "data"].split("=")[1]
label = label[label.index("open") :]
ottr.append([nn, label])
nodes = df.loc[idx + 1 : idx + nn, "data"].astype("int")
idx = idx + nn + 1
oi = pd.DataFrame({"node": nodes, "type": "open", "id": int(label[-1])})
odic.update({label: oi})
try:
dfo = pd.concat(odic).droplevel(0).reset_index(drop=True)
except ValueError:
dfo = pd.DataFrame()
# Land boundaries
n1 = df[df.data.str.contains("land boundaries")].index
n1 = n1.values[0]
nlb = df.loc[n1, "data"].split("=")[0].strip()
nlb = int(nlb)
nlbn = df.loc[n1 + 1, "data"].split("=")[0].strip()
nlbn = int(nlbn)
ldic = {}
attr = []
idx = n1 + 2
ili = -1
lli = 1001
for nl in range(nlb):
nn, etype = df.loc[idx, "data"].split("=")[0].strip().split(" ")
nn = int(nn)
etype = int(etype)
label = df.loc[idx, "data"].split("=")[1]
label = label[label.index("land") :]
attr.append([nn, etype, label])
nodes = df.loc[idx + 1 : idx + nn, "data"].astype(int)
idx = idx + nn + 1
li = pd.DataFrame({"node": nodes})
tt = ["land" if etype == 0 else "island"]
idi = [ili if etype == 1 else 1000 + int(label[-1])]
li["type"] = tt[0]
li["id"] = idi[0]
ldic.update({label: li})
if tt[0] == "land":
lli += 1
elif tt[0] == "island":
ili -= 1
else:
raise ValueError(f"mesh boundaries error")
try:
dfl = pd.concat(ldic).droplevel(0).reset_index(drop=True)
except ValueError:
dfl = pd.DataFrame()
# concat boundaries
bbs = pd.concat([dfo, dfl])
bbs.node = bbs.node - 1 # start_index = 0
bbs = bbs.reset_index(drop=True) # reset index
bbs = bbs[["type", "node", "id"]] # set column order
bbs = bbs.sort_values(["type", "id", "node"]).reset_index(drop=True) # sort
bbs.index.name = "bnodes"
# merge to one xarray DataSet
g = xr.merge([mesh, depth, els, bbs.to_xarray()])
g.attrs = {}
return g
def to_file(self, filename, **kwargs):
logger.info("writing mesh to file {}".format(filename))
nn = self.Dataset.SCHISM_hgrid_node_x.size
n3e = self.Dataset.nSCHISM_hgrid_face.size
with open(filename, "w") as f:
f.write("\t uniform.gr3\n")
f.write("\t {} {}\n".format(n3e, nn))
q = self.Dataset[["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "depth"]].to_dataframe()
q.index = np.arange(1, len(q) + 1)
q.to_csv(
filename,
index=True,
sep="\t",
header=None,
mode="a",
float_format="%.10f",
columns=["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "depth"],
)
e = pd.DataFrame(
self.Dataset.SCHISM_hgrid_face_nodes.dropna(dim="nMaxSCHISM_hgrid_face_nodes").values,
columns=["a", "b", "c"],
)
e["nv"] = e.apply(lambda row: row.dropna().size, axis=1)
e.index = np.arange(1, len(e) + 1)
e = e.dropna(axis=1).astype(int)
e.loc[:, ["a", "b", "c"]] = e.loc[:, ["a", "b", "c"]] + 1 # convert to fortran (index starts from 1)
e.to_csv(
filename,
index=True,
sep="\t",
header=None,
mode="a",
columns=["nv", "a", "b", "c"],
)
bs = self.Dataset[["node", "id", "type"]].to_dataframe()
# open boundaries
number_of_open_boundaries = bs.loc[bs.type == "open"].id
if not number_of_open_boundaries.empty:
number_of_open_boundaries = number_of_open_boundaries.max()
else:
number_of_open_boundaries = 0
number_of_open_boundaries_nodes = bs.loc[bs.type == "open"].shape[0]
if number_of_open_boundaries > 0:
with open(filename, "a") as f:
f.write("{} = Number of open boundaries\n".format(number_of_open_boundaries))
f.write("{} = Total number of open boundary nodes\n".format(number_of_open_boundaries_nodes))
for i in range(1, number_of_open_boundaries + 1):
dat = bs.loc[bs.id == i, "node"] + 1 # fortran
f.write("{} = Number of nodes for open boundary {}\n".format(dat.size, i))
dat.to_csv(f, index=None, header=False)
else:
with open(filename, "a") as f:
f.write("{} = Number of open boundaries\n".format(0))
f.write("{} = Total number of open boundary nodes\n".format(0))
# land boundaries
number_of_land_boundaries = bs.loc[bs.type == "land"].id
if not number_of_land_boundaries.empty:
number_of_land_boundaries = number_of_land_boundaries.max() - 1000
else:
number_of_land_boundaries = 0
number_of_land_boundaries_nodes = bs.loc[bs.type == "land"].shape[0]
number_of_island_boundaries = bs.loc[bs.type == "island"].id
if not number_of_island_boundaries.empty:
number_of_island_boundaries = number_of_island_boundaries.min()
else:
number_of_island_boundaries = 0
number_of_island_boundaries_nodes = bs.loc[bs.type == "island"].shape[0]
nlb = number_of_land_boundaries - number_of_island_boundaries
nlbn = number_of_land_boundaries_nodes + number_of_island_boundaries_nodes
if nlb > 0:
with open(filename, "a") as f:
f.write("{} = Number of land boundaries\n".format(nlb))
f.write("{} = Total number of land boundary nodes\n".format(nlbn))
ik = 1
else:
with open(filename, "a") as f:
f.write("{} = Number of land boundaries\n".format(0))
f.write("{} = Total number of land boundary nodes\n".format(0))
if number_of_land_boundaries > 0:
with open(filename, "a") as f:
for i in range(1001, 1000 + number_of_land_boundaries + 1):
dat_ = bs.loc[bs.id == i]
dat = dat_.node + 1 # fortran
f.write("{} {} = Number of nodes for land boundary {}\n".format(dat.size, 0, ik))
dat.to_csv(f, index=None, header=False)
ik += 1
if number_of_island_boundaries < 0:
with open(filename, "a") as f:
for i in range(-1, number_of_island_boundaries - 1, -1):
dat_ = bs.loc[bs.id == i]
dat = dat_.node + 1 # fortran
f.write("{} {} = Number of nodes for land boundary {}\n".format(dat.size, 1, ik))
dat.to_csv(f, index=None, header=False)
ik += 1
def validate(self, **kwargs):
# ---------------------------------------------------------------------
logger.info("start mesh validation\n")
# ---------------------------------------------------------------------
path = kwargs.get("rpath", "./")
if not os.path.exists(path):
os.makedirs(path)
# save bctides.in
bs = self.Dataset[["node", "id", "type"]].to_dataframe()
# open boundaries
number_of_open_boundaries = np.nan_to_num(bs.loc[bs.type == "open"].id.max()).astype(int)
number_of_open_boundaries_nodes = bs.loc[bs.type == "open"].shape[0]
with open(path + "bctides.in", "w") as f:
f.write("Header\n")
f.write("{} {}\n".format(0, 40.0)) # ntip tip_dp
f.write("{}\n".format(0)) # nbfr
f.write("{}\n".format(number_of_open_boundaries)) # number of open boundaries
for i in range(1, number_of_open_boundaries + 1):
nnodes = bs.loc[bs.id == i, "node"].shape[0]
f.write(
"{} {} {} {} {}\n".format(nnodes, 2, 0, 0, 0)
) # number of nodes on the open boundary segment j (corresponding to hgrid.gr3), B.C. flags for elevation, velocity, temperature, and salinity
f.write("{}\n".format(0)) # ethconst !constant elevation value for this segment
# save vgrid.in
with open(path + "vgrid.in", "w") as f:
f.write("{}\n".format(2)) # ivcor (1: LSC2; 2: SZ)
f.write(
"{} {} {}\n".format(2, 1, 1.0e6)
) # nvrt(=Nz); kz (# of Z-levels); hs (transition depth between S and Z)
f.write("Z levels\n") # Z levels !Z-levels in the lower portion
f.write(
"{} {}\n".format(1, -1.0e6)
) #!level index, z-coordinates, z-coordinate of the last Z-level must match -hs
f.write("S levels\n") # S-levels below
f.write(
"{} {} {}\n".format(40.0, 1.0, 1.0e-4)
) # constants used in S-transformation: h_c, theta_b, theta_f
f.write("{} {}\n".format(1, -1.0)) # first S-level (sigma-coordinate must be -1)
f.write("{} {}\n".format(2, 0.0)) # levels index, sigma-coordinate, last sigma-coordinate must be 0
# save params.nml
config_file = DATA_PATH + "param_val.nml"
params = f90nml.read(config_file)
params.write(path + "param.nml", force=True)
# save hgrid.gr3
self.to_file(filename=path + "hgrid.gr3")
try:
bin_path = os.environ["SCHISM"]
except:
bin_path = kwargs.get("epath", None)
if bin_path is None:
# ------------------------------------------------------------------------------
logger.warning("Schism executable path (epath) not given -> using default \n")
# ------------------------------------------------------------------------------
bin_path = "schism"
tools.create_schism_mpirun_script(
target_dir=path,
cmd=bin_path,
script_name="launchSchism.sh",
ncores=1,
)
# note that cwd is the folder where the executable is
ex = subprocess.Popen(
args=["./launchSchism.sh"],
cwd=path,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) # , bufsize=1)
out, err = ex.communicate()[:]
if "successfully" in str(out):
# ---------------------------------------------------------------------
logger.info("mesh is validated for SCHISM\n")
# ---------------------------------------------------------------------
return True
else:
logger.debug(str(out))
# ---------------------------------------------------------------------
logger.info("mesh fails.. exiting \n")
# ---------------------------------------------------------------------
return False
def verify(self, **kwargs):
shp = kwargs.get("coastlines", None)
if shp is not None:
r = verify(self, shp)
return r
else:
logger.warning("No coastlines provided")
pyposeidon.model
¶
Main model module of pyposeidon. It controls the creation, execution & output of a complete simulation based on different hydrodynamic models
Currently supported : DELFT3D , SCHISM
set(solver_name, atm=True, tide=False, **kwargs)
¶
Construct a hydrodynamic model based on different solvers.
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
except geometry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
solver_name |
str |
Name of solver used, e.g. |
required |
atm |
bool |
Flag for using meteo forcing. Defaults to |
True |
tide |
bool |
Flag for using tidal configuration. Defaults to |
False |
Source code in pyposeidon/model.py
def set(solver_name, atm=True, tide=False, **kwargs):
"""
Construct a hydrodynamic model based on different solvers.
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional` except geometry.
Args:
solver_name str: Name of solver used, e.g. `d3d` or `schism`.
atm bool: Flag for using meteo forcing. Defaults to `True`.
tide bool: Flag for using tidal configuration. Defaults to `False`.
"""
solver_class = tools.get_solver(solver_name=solver_name)
instance = solver_class(atm=atm, tide=tide, **kwargs)
return instance
pyposeidon.schism
¶
Schism model of pyposeidon. It controls the creation, execution & output of a complete simulation based on SCHISM.
Schism
¶
Source code in pyposeidon/schism.py
class Schism:
def __init__(self, **kwargs):
"""
Create a Schism solver
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
rfolder str: The path to a directory containing the results of a Model solved by Schism.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
load_mesh bool: Flag indicating whether to load the mesh or not. Defaults to `False`.
load_meteo bool: Flag indicating whether to load the meteo data or not. Defauls to
`False`.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines.
tag str: The model's "tag". Defaults to `"schism"`.
tide str: Flag indicating whether to load "tide". Defaults to `False`.
atm bool: The solver's atm. Defaults to `True`.
monitor bool: The solver's monitor. Defaults to `False`.
epath str: The path to the schism executable. If the `SCHISM` env variable has been
set, then it overrides the value passed as the parameter.
start_date str: The date from which the analysis should start. It should be a string parseable
by `pd.to_datetime()`.
end_date str: The date at which the analysis should end. It should be a string parseable by
`pd.to_datetime()`.
time_frame str: The duration of the analysis. It should be a string parseable by
`pd.to_datetime()`.
date str: Reference date of the run.
rpath str: Path for output of the model. Defaults to `./schism/`.
m_index int: Define the index of the meteo Dataset. Defaults to `1`.
filename str: Path to output the meteo Dataset. Defaults to `sflux/`.
dstamp str: Reference date for station data. Defaults to date.
parameters dict: Overwrite default Schism's parameter values.
meteo_source str: Path or url to meteo data.
dem_source str: Path or url to bathymetric data.
update list[str]: Control the update of the model e.g `['dem']`-> updates only bathymetry.
Defaults to `["all"]`.
meteo_split_by str: Split the meteo Dataset to multiple files by e.g. `"day"`.
Defaults to `None`.
manning_file str: Path to manning file.
manning float: Set constant value in the manning.gr3 file. Defaults to `0.12`.
windrot_file str: Path to windrot file.
windrot float: Set constant value in the windrot_geo2proj.gr3 file. Defaults to `0.00001`.
station_flags list[int]: Define the flag for station output. Defaults to `[1,0,0,0,0,0,0,0,0]`.
coastal_monitoring bool: Flag for setting all land/island boundary nodes as stations.
Defaults to `False`.
obs str: Path to csv file for station locations. Defaults to `misc/critech.csv`.
nspool_sta int: Related to station nodes setup. Defaults to `1`.
"""
rfolder = kwargs.get("rfolder", None)
if rfolder:
self.read_folder(**kwargs)
self.geometry = kwargs.get("geometry", None)
if self.geometry:
if isinstance(self.geometry, dict):
self.lon_min = self.geometry["lon_min"]
self.lon_max = self.geometry["lon_max"]
self.lat_min = self.geometry["lat_min"]
self.lat_max = self.geometry["lat_max"]
elif isinstance(self.geometry, str):
try:
geo = gp.GeoDataFrame.from_file(self.geometry)
except:
logger.error("geometry argument not a valid geopandas file")
sys.exit(1)
(
self.lon_min,
self.lat_min,
self.lon_max,
self.lat_max,
) = geo.total_bounds
# coastlines
coastlines = kwargs.get("coastlines", None)
if coastlines is not None:
try:
coast = gp.GeoDataFrame.from_file(coastlines)
except:
coast = gp.GeoDataFrame(coastlines)
self.coastlines = coast
if not hasattr(self, "start_date"):
start_date = kwargs.get("start_date", None)
self.start_date = pd.to_datetime(start_date)
if not hasattr(self, "end_date"):
if "time_frame" in kwargs:
time_frame = kwargs.get("time_frame", None)
self.end_date = self.start_date + pd.to_timedelta(time_frame)
elif "end_date" in kwargs:
end_date = kwargs.get("end_date", None)
self.end_date = pd.to_datetime(end_date)
self.time_frame = self.end_date - self.start_date
if not hasattr(self, "date"):
self.date = get_value(self, kwargs, "date", self.start_date)
if not hasattr(self, "end_date"):
# ---------------------------------------------------------------------
logger.warning("model not set properly, No end_date\n")
# ---------------------------------------------------------------------
self.tag = kwargs.get("tag", "schism")
self.tide = kwargs.get("tide", False)
self.atm = kwargs.get("atm", True)
self.monitor = kwargs.get("monitor", False)
try:
self.epath = os.environ["SCHISM"]
except:
self.epath = kwargs.get("epath", None)
self.solver_name = SCHISM_NAME
for attr, value in kwargs.items():
if not hasattr(self, attr):
setattr(self, attr, value)
setattr(self, "misc", {})
# ============================================================================================
# CONFIG
# ============================================================================================
def config(self, config_file=None, output=False, **kwargs):
dic = get_value(self, kwargs, "parameters", None)
# param_file = get_value(self,kwargs,'config_file',None)
if config_file:
# ---------------------------------------------------------------------
logger.info("reading parameter file {}\n".format(config_file))
# ---------------------------------------------------------------------
else:
# ---------------------------------------------------------------------
logger.info("using default parameter file ...\n")
# ---------------------------------------------------------------------
config_file = DATA_PATH + "param.nml"
params = f90nml.read(config_file)
# update key values
patch = {
"start_year": self.start_date.year,
"start_month": self.start_date.month,
"start_day": self.start_date.day,
"start_hour": self.start_date.hour,
}
params = unml(params, patch)
# update
if dic:
params = unml(params, dic)
# test rnday
if float(params["CORE"]["rnday"]) * 24 * 3600 > (self.end_date - self.start_date).total_seconds():
# ---------------------------------------------------------------------
logger.warning("rnday larger than simulation range\n")
logger.warning(
"rnday={} while simulation time is {}\n".format(
params["core"]["rnday"],
(self.end_date - self.start_date).total_seconds() / (3600 * 24.0),
)
)
# ---------------------------------------------------------------------
self.params = params
if output:
# save params
# ---------------------------------------------------------------------
logger.info("output param.nml file ...\n")
# ---------------------------------------------------------------------
path = get_value(self, kwargs, "rpath", "./schism/")
self.params.write(path + "param.nml", force=True)
# ============================================================================================
# METEO
# ============================================================================================
def force(self, **kwargs):
meteo_source = get_value(self, kwargs, "meteo_source", None)
kwargs.update({"meteo_source": meteo_source})
flag = get_value(self, kwargs, "update", ["all"])
z = {**self.__dict__, **kwargs} # merge self and possible kwargs
# check if files exist
if flag:
if ("meteo" in flag) | ("all" in flag):
self.meteo = pmeteo.Meteo(**z)
else:
logger.info("skipping meteo ..\n")
else:
self.meteo = pmeteo.Meteo(**z)
if hasattr(self, "meteo"):
# add 1 hour for Schism issue with end time
ap = self.meteo.Dataset.isel(time=-1)
ap["time"] = ap.time.values + pd.to_timedelta("1H")
self.meteo.Dataset = xr.concat([self.meteo.Dataset, ap], dim="time")
@staticmethod
def to_force(ar0, **kwargs):
logger.info("writing meteo files ..\n")
path = kwargs.get("rpath", "./schism/")
[p, u, v] = kwargs.get("vars", "[None,None,None]")
ar = ar0.sortby("latitude", ascending=True)
xx, yy = np.meshgrid(ar.longitude.data, ar.latitude.data)
zero = np.zeros(ar[p].data.shape)
date = kwargs.get("date", ar.time[0].data)
udate = pd.to_datetime(date).strftime("%Y-%m-%d")
bdate = pd.to_datetime(date).strftime("%Y %m %d %H").split(" ")
tlist = (ar.time.data - pd.to_datetime([udate]).values).astype("timedelta64[s]") / 3600.0
tlist = tlist.astype(float) / 24.0
bdate = [int(q) for q in bdate[:3]] + [0]
sout = xr.Dataset(
{
"prmsl": (["time", "nx_grid", "ny_grid"], ar[p].data),
"uwind": (["time", "nx_grid", "ny_grid"], ar[u].data),
"vwind": (["time", "nx_grid", "ny_grid"], ar[v].data),
"spfh": (["time", "nx_grid", "ny_grid"], zero),
"stmp": (["time", "nx_grid", "ny_grid"], zero),
"lon": (["nx_grid", "ny_grid"], xx),
"lat": (["nx_grid", "ny_grid"], yy),
},
coords={"time": tlist},
)
sout.attrs = {
"description": "Schism meteo data",
"history": "pyposeidon",
"source": "netCDF4 python module",
}
sout.time.attrs = {
"long_name": "Time",
"standard_name": "time",
"base_date": bdate,
"units": udate,
}
sout.lat.attrs = {
"units": "degrees_north",
"long_name": "Latitude",
"standard_name": "latitude",
}
sout.prmsl.attrs = {
"units": "Pa",
"long_name": "Pressure reduced to MSL",
"standard_name": "air_pressure_at_sea_level",
}
sout.uwind.attrs = {
"units": "m/s",
"long_name": "Surface Eastward Air Velocity",
"standard_name": "eastward_wind",
}
sout.vwind.attrs = {
"units": "m/s",
"long_name": "Surface Northward Air Velocity",
"standard_name": "northward_wind",
}
sout.spfh.attrs = {
"units": "1",
"long_name": "Surface Specific Humidity (2m AGL)",
"standard_name": "specific_humidity",
}
sout.stmp.attrs = {
"units": "degrees",
"long_name": "Surface Temperature",
"standard_name": "surface temperature",
}
# check if folder sflux exists
if not os.path.exists(path + "sflux"):
os.makedirs(path + "sflux")
m_index = kwargs.get("m_index", 1)
filename = kwargs.get("filename", "sflux/sflux_air_{}.0001.nc".format(m_index))
sout.to_netcdf(path + filename)
# ============================================================================================
# DEM
# ============================================================================================
def bath(self, **kwargs):
# z = self.__dict__.copy()
kwargs["grid_x"] = self.mesh.Dataset.SCHISM_hgrid_node_x.values
kwargs["grid_y"] = self.mesh.Dataset.SCHISM_hgrid_node_y.values
dpath = get_value(self, kwargs, "dem_source", None)
kwargs.update({"dem_source": dpath})
flag = get_value(self, kwargs, "update", ["all"])
# check if files exist
if flag:
if ("dem" in flag) | ("all" in flag):
kwargs.update(
{
"lon_min": self.lon_min,
"lat_min": self.lat_min,
"lon_max": self.lon_max,
"lat_max": self.lat_max,
}
)
self.dem.Dataset = pdem.dem_on_mesh(self.dem.Dataset, **kwargs)
else:
logger.info("dem from mesh file\n")
# ============================================================================================
# EXECUTION
# ============================================================================================
def create(self, **kwargs):
if not kwargs:
kwargs = self.__dict__.copy()
# DEM
self.dem = pdem.Dem(**kwargs)
kwargs.update({"dem_source": self.dem.Dataset})
# Grid
self.mesh = pmesh.set(type="tri2d", **kwargs)
# set lat/lon from file
if hasattr(self, "mesh_file"):
kwargs.update({"lon_min": self.mesh.Dataset.SCHISM_hgrid_node_x.values.min()})
kwargs.update({"lon_max": self.mesh.Dataset.SCHISM_hgrid_node_x.values.max()})
kwargs.update({"lat_min": self.mesh.Dataset.SCHISM_hgrid_node_y.values.min()})
kwargs.update({"lat_max": self.mesh.Dataset.SCHISM_hgrid_node_y.values.max()})
self.lon_min = self.mesh.Dataset.SCHISM_hgrid_node_x.values.min()
self.lon_max = self.mesh.Dataset.SCHISM_hgrid_node_x.values.max()
self.lat_min = self.mesh.Dataset.SCHISM_hgrid_node_y.values.min()
self.lat_max = self.mesh.Dataset.SCHISM_hgrid_node_y.values.max()
# get bathymetry
self.bath(**kwargs)
# get boundaries
# self.bc()
# get meteo
if self.atm:
self.force(**kwargs)
# get tide
if self.tide:
self.tidebc()
self.config(**kwargs)
def output(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
flag = get_value(self, kwargs, "update", ["all"])
split_by = get_value(self, kwargs, "meteo_split_by", None)
if not os.path.exists(path):
os.makedirs(path)
# save sflux_inputs.txt
if not os.path.exists(path + "sflux"):
os.makedirs(path + "sflux")
with open(path + "sflux/sflux_inputs.txt", "w") as f:
f.write("&sflux_inputs\n")
f.write("/ \n\n")
# save bctides.in
bs = self.mesh.Dataset[["node", "id", "type"]].to_dataframe()
# open boundaries
number_of_open_boundaries = bs.loc[bs.type == "open"].id
if not number_of_open_boundaries.empty:
number_of_open_boundaries = number_of_open_boundaries.max()
else:
number_of_open_boundaries = 0
number_of_open_boundaries_nodes = bs.loc[bs.type == "open"].shape[0]
with open(path + "bctides.in", "w") as f:
f.write("Header\n")
f.write("{} {}\n".format(0, 40.0)) # ntip tip_dp
f.write("{}\n".format(0)) # nbfr
f.write("{}\n".format(number_of_open_boundaries)) # number of open boundaries
for i in range(1, number_of_open_boundaries + 1):
nnodes = bs.loc[bs.id == i, "node"].shape[0]
f.write(
"{} {} {} {} {}\n".format(nnodes, 2, 0, 0, 0)
) # number of nodes on the open boundary segment j (corresponding to hgrid.gr3), B.C. flags for elevation, velocity, temperature, and salinity
f.write("{}\n".format(0)) # ethconst !constant elevation value for this segment
# save vgrid.in
with open(path + "vgrid.in", "w") as f:
f.write("{}\n".format(2)) # ivcor (1: LSC2; 2: SZ)
f.write(
"{} {} {}\n".format(2, 1, 1.0e6)
) # nvrt(=Nz); kz (# of Z-levels); hs (transition depth between S and Z)
f.write("Z levels\n") # Z levels !Z-levels in the lower portion
f.write(
"{} {}\n".format(1, -1.0e6)
) #!level index, z-coordinates, z-coordinate of the last Z-level must match -hs
f.write("S levels\n") # S-levels below
f.write(
"{} {} {}\n".format(40.0, 1.0, 1.0e-4)
) # constants used in S-transformation: h_c, theta_b, theta_f
f.write("{} {}\n".format(1, -1.0)) # first S-level (sigma-coordinate must be -1)
f.write("{} {}\n".format(2, 0.0)) # levels index, sigma-coordinate, last sigma-coordinate must be 0
# save params.in
self.params.write(path + "param.nml", force=True)
# save hgrid.gr3
try:
try:
bat = -self.dem.Dataset.fval.values.astype(float) # minus for the hydro run
if np.isnan(bat).sum() != 0:
raise Exception("Bathymetry contains NaNs")
logger.warning("Bathymetric values fval contain NaNs, using ival values ..\n")
except:
bat = -self.dem.Dataset.ival.values.astype(float) # minus for the hydro run
self.mesh.Dataset.depth.loc[: bat.size] = bat
self.mesh.to_file(filename=path + "hgrid.gr3")
copyfile(path + "hgrid.gr3", path + "hgrid.ll")
logger.info("updating bathymetry ..\n")
except AttributeError as e:
logger.info("Keeping bathymetry from hgrid.gr3 ..\n")
copyfile(self.mesh_file, path + "hgrid.gr3") # copy original grid file
copyfile(path + "hgrid.gr3", path + "hgrid.ll")
# manning file
manfile = path + "manning.gr3"
if hasattr(self, "manning_file"):
copyfile(self.manning_file, manfile) # copy original manning file
if self.manning_file == manfile:
logger.info("Keeping manning file ..\n")
manning = get_value(self, kwargs, "manning", 0.12)
nn = self.mesh.Dataset.nSCHISM_hgrid_node.size
n3e = self.mesh.Dataset.nSCHISM_hgrid_face.size
with open(manfile, "w") as f:
f.write("\t 0 \n")
f.write("\t {} {}\n".format(n3e, nn))
df = self.mesh.Dataset[["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "depth"]].to_dataframe()
df["man"] = manning
df.index = np.arange(1, len(df) + 1)
df.to_csv(
manfile,
index=True,
sep="\t",
header=None,
mode="a",
float_format="%.10f",
columns=["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "man"],
)
logger.info("Manning file created..\n")
# windrot_geo2proj
windfile = path + "windrot_geo2proj.gr3"
if hasattr(self, "windrot_file"):
copyfile(self.windrot_file, windfile) # copy original grid file
if self.windrot_file != windfile:
logger.info("Keeping windrot_geo2proj file ..\n")
windrot = get_value(self, kwargs, "windrot", 0.00001)
with open(windfile, "w") as f:
f.write("\t 0 \n")
f.write("\t {} {}\n".format(n3e, nn))
df = self.mesh.Dataset[["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "depth"]].to_dataframe()
df["windrot"] = windrot
df.index = np.arange(1, len(df) + 1)
df.to_csv(
windfile,
index=True,
sep="\t",
header=None,
mode="a",
float_format="%.10f",
columns=["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "windrot"],
)
logger.info("Windrot_geo2proj file created..\n")
# save meteo
m_index = get_value(self, kwargs, "m_index", 1)
if hasattr(self, "atm"):
try:
if split_by:
times, datasets = zip(*self.meteo.Dataset.groupby("time.{}".format(split_by)))
mpaths = ["sflux/sflux_air_{}.{:04d}.nc".format(m_index, t + 1) for t in np.arange(len(times))]
for das, mpath in list(zip(datasets, mpaths)):
self.to_force(
das, vars=["msl", "u10", "v10"], rpath=path, filename=mpath, date=self.date, **kwargs
)
else:
self.to_force(self.meteo.Dataset, vars=["msl", "u10", "v10"], rpath=path, **kwargs)
except AttributeError as e:
logger.warning("no meteo data available.. no update..\n")
pass
calc_dir = get_value(self, kwargs, "rpath", "./schism/")
try:
bin_path = os.environ["SCHISM"]
except:
bin_path = get_value(self, kwargs, "epath", None)
if bin_path is None:
# ------------------------------------------------------------------------------
logger.warning("Schism executable path (epath) not given -> using default \n")
# ------------------------------------------------------------------------------
bin_path = "schism"
tools.create_schism_mpirun_script(
target_dir=calc_dir,
cmd=bin_path,
script_name="launchSchism.sh",
)
# ---------------------------------------------------------------------
logger.info("output done\n")
# ---------------------------------------------------------------------
def run(self, **kwargs):
calc_dir = get_value(self, kwargs, "rpath", "./schism/")
# ---------------------------------------------------------------------
logger.info("executing model\n")
# ---------------------------------------------------------------------
# note that cwd is the folder where the executable is
ex = subprocess.Popen(
args=["./launchSchism.sh"],
cwd=calc_dir,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) # , bufsize=1)
with open(calc_dir + "err.log", "w") as f:
for line in iter(ex.stderr.readline, b""):
f.write(line.decode(sys.stdout.encoding))
logger.info(line.decode(sys.stdout.encoding))
ex.stderr.close()
with open(calc_dir + "run.log", "w") as f:
for line in iter(ex.stdout.readline, b""):
f.write(line.decode(sys.stdout.encoding))
logger.info(line.decode(sys.stdout.encoding))
ex.stdout.close()
# ---------------------------------------------------------------------
logger.info("FINISHED\n")
# ---------------------------------------------------------------------
def save(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
lista = [key for key, value in self.__dict__.items() if key not in ["meteo", "dem", "mesh"]]
dic = {k: self.__dict__.get(k, None) for k in lista}
mesh = self.__dict__.get("mesh", None)
if isinstance(mesh, str):
dic.update({"mesh": mesh})
else:
dic.update({"mesh": mesh.__class__.__name__})
dem = self.__dict__.get("dem", None)
if isinstance(dem, str):
dic.update({"dem": dem})
elif isinstance(dem, pdem.Dem):
dic.update({"dem_attrs": dem.Dataset.elevation.attrs})
meteo = self.__dict__.get("meteo", None)
if isinstance(meteo, str):
dic.update({"meteo": meteo})
elif isinstance(meteo, pmeteo.Meteo):
try:
dic.update({"meteo": [meteo.Dataset.attrs]})
except:
dic.update({"meteo": [x.attrs for x in meteo.Dataset]})
coastlines = self.__dict__.get("coastlines", None)
dic.update({"coastlines": coastlines})
dic["version"] = pyposeidon.__version__
for attr, value in dic.items():
if isinstance(value, datetime.datetime):
dic[attr] = value.isoformat()
if isinstance(value, pd.Timedelta):
dic[attr] = value.isoformat()
if isinstance(value, pd.DataFrame):
dic[attr] = value.to_dict()
if isinstance(value, gp.GeoDataFrame):
dic[attr] = value.to_json()
json.dump(dic, open(path + self.tag + "_model.json", "w"), default=myconverter)
def execute(self, **kwargs):
flag = get_value(self, kwargs, "update", ["all"])
if flag:
self.create(**kwargs)
self.output(**kwargs)
if self.monitor:
self.set_obs()
self.save(**kwargs)
self.run(**kwargs)
else:
self.save(**kwargs)
calc_dir = get_value(self, kwargs, "rpath", "./schism/")
try:
bin_path = os.environ["SCHISM"]
except:
bin_path = get_value(self, kwargs, "epath", None)
if bin_path is None:
# ------------------------------------------------------------------------------
logger.warning("Schism executable path (epath) not given -> using default \n")
# ------------------------------------------------------------------------------
bin_path = "schism"
tools.create_mpirun_script(
target_dir=calc_dir,
cmd=bin_path,
script_name="launchSchism.sh",
)
self.run(**kwargs)
def read_folder(self, rfolder, **kwargs):
self.rpath = rfolder
s = glob.glob(rfolder + "/param.nml")
mfiles1 = glob.glob(rfolder + "/sflux/*_1*.nc")
mfiles1.sort()
# check for 2nd meteo
mfiles2 = glob.glob(rfolder + "/sflux/*_2*.nc")
mfiles2.sort()
mfiles = {"1": mfiles1, "2": mfiles2}
mfiles = {k: v for k, v in mfiles.items() if v} # remove empty keys, e.g. no mfiles2
hfile = rfolder + "/hgrid.gr3" # Grid
self.params = f90nml.read(s[0])
mykeys = ["start_year", "start_month", "start_day"]
sdate = [self.params["opt"][x] for x in mykeys] # extract date info from param.nml
sd = "-".join(str(item) for item in sdate) # join in str
sd = pd.to_datetime(sd) # convert to datestamp
sh = [self.params["opt"][x] for x in ["start_hour", "utc_start"]] # get time attrs
sd = sd + pd.to_timedelta("{}H".format(sh[0] + sh[1])) # complete start_date
ed = sd + pd.to_timedelta("{}D".format(self.params["core"]["rnday"])) # compute end date based on rnday
self.start_date = sd # set attrs
self.end_date = ed
load_mesh = get_value(self, kwargs, "load_mesh", False)
if load_mesh:
try:
self.mesh = pmesh.set(type="tri2d", mesh_file=hfile)
except:
logger.warning("Loading mesh failed")
pass
else:
logger.warning("No mesh loaded")
load_meteo = get_value(self, kwargs, "load_meteo", False)
# meteo
if load_meteo is True:
try:
pm = []
for key, val in mfiles.items():
msource = xr.open_mfdataset(val)
pm.append(msource)
if len(pm) == 1:
pm = pm[0]
self.meteo = pmeteo.Meteo(meteo_source=pm)
except:
pm = []
for key, val in mfiles.items():
ma = []
for ifile in mfiles:
g = xr.open_dataset(ifile)
ts = "-".join(g.time.attrs["base_date"].astype(str)[:3])
time_r = pd.to_datetime(ts)
try:
times = time_r + pd.to_timedelta(g.time.values, unit="D").round("H")
g = g.assign_coords({"time": times})
ma.append(g)
except:
logger.warning("Loading meteo failed")
break
if ma:
msource = xr.merge(ma)
pm.append(msource)
if len(pm) == 1:
pm = pm[0]
self.meteo = pmeteo.Meteo(meteo_source=pm)
else:
logger.warning("No meteo loaded")
def global2local(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
# Read the global node index distribution to the cores
gfiles = glob.glob(path + "outputs/local_to_global_*")
gfiles.sort()
# create a dict from filenames to identify parts in the dataframes below
keys = []
for name in gfiles:
keys.append("core{}".format(name.split("/")[-1].split("_")[-1]))
# Parsing the files
# We read from the first file the header (it is the same for all)
with open(gfiles[0], "r") as f:
header = pd.read_csv(
f,
header=None,
nrows=1,
delim_whitespace=True,
names=[
"ns_global",
"ne_global",
"np_global",
"nvrt",
"nproc",
"ntracers",
"T",
"S",
"GEN",
"AGE",
"SED3D",
"EcoSim",
"ICM",
"CoSINE",
"Feco",
"TIMOR",
"FABM",
"DVD",
],
)
# get the number of elems from all files
nels = []
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
ne = pd.read_csv(f, skiprows=2, header=None, nrows=1)
nels.append(ne.values.flatten()[0].astype(int))
# read and add them to pandas DataFrame
frames = np.empty(len(gfiles), dtype=object)
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
frames[i] = pd.read_csv(
f,
skiprows=3,
header=None,
nrows=nels[i],
names=["local", "global_n"],
delim_whitespace=True,
)
elems = pd.concat(frames, keys=keys)
# get the number of nodes from all files
nq = []
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
nn = pd.read_csv(f, skiprows=nels[i] + 3, header=None, nrows=1)
nq.append(nn.values.flatten()[0].astype(int))
# read and add them to pandas DataFrame
nframes = np.empty(len(gfiles), dtype=object)
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
nframes[i] = pd.read_csv(
f,
skiprows=nels[i] + 4,
header=None,
nrows=nq[i],
names=["local", "global_n"],
delim_whitespace=True,
)
nodes = pd.concat(nframes, keys=keys)
# get the number of edges
nw = []
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
nb = pd.read_csv(f, skiprows=nels[i] + nq[i] + 4, header=None, nrows=1)
nw.append(nb.values.flatten()[0].astype(int))
# read and add them to pandas DataFrame
wframes = np.empty(len(gfiles), dtype=object)
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
wframes[i] = pd.read_csv(
f,
skiprows=nels[i] + nq[i] + 5,
header=None,
nrows=nw[i],
names=["local", "global_n"],
delim_whitespace=True,
)
re = pd.concat(wframes, keys=keys)
# read secondary headers
with open(gfiles[0], "r") as f:
h0 = pd.read_csv(
f,
skiprows=nels[0] + nq[0] + nw[0] + 6,
header=None,
nrows=1,
delim_whitespace=True,
names=[
"start_year",
"start_month",
"start_day",
"start_hour",
"utc_start",
],
)
with open(gfiles[0], "r") as f:
h1 = pd.read_csv(
f,
skiprows=nels[0] + nq[0] + nw[0] + 7,
header=None,
nrows=1,
delim_whitespace=True,
names=[
"nrec",
"dtout",
"nspool",
"nvrt",
"kz",
"h0",
"h_s",
"h_c",
"theta_b",
"theta_f",
"ics",
],
)
ztots = ["ztot_" + str(i) for i in range(1, h1.loc[:, "kz"].values[0] - 1)]
sigmas = ["sigma_" + str(i) for i in range(h1.loc[:, "nvrt"].values[0] - h1.loc[:, "kz"].values[0] + 1)]
# read secondary header
with open(gfiles[0], "r") as f:
h2 = pd.read_csv(
f,
skiprows=nels[0] + nq[0] + nw[0] + 8,
header=None,
nrows=2,
delim_whitespace=True,
)
h2 = h2.T
h2.columns = ztots + sigmas
# combine headers
self.misc.update({"header": pd.concat([h0, h1, h2], axis=1)})
# read lat/lon from all files
gframes = np.empty(len(gfiles), dtype=object)
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
gframes[i] = pd.read_csv(
f,
skiprows=nels[i] + nq[i] + nw[i] + 11,
header=None,
nrows=nq[i],
delim_whitespace=True,
names=["lon", "lat", "depth", "kbp00"],
)
mesh = pd.concat(gframes, keys=keys)
# Droping duplicates
drops = nodes.reset_index()[nodes.reset_index().duplicated("global_n")].index.to_list()
cnodes = nodes.global_n.drop_duplicates() # drop duplicate global nodes and store the values to an array
mesh = mesh.reset_index().drop(drops) # Use the mask from nodes to match mesh
mesh = mesh.set_index(["level_0", "level_1"])
mesh.index = mesh.index.droplevel() # drop multi-index
mesh = mesh.reset_index(drop=True) # reset index
mesh.index = cnodes.values - 1 # reindex based on the global index, -1 for the python convention
grd = mesh.sort_index() # sort with the new index (that is the global_n)
self.misc.update({"grd": grd.reset_index(drop=True)}) # reindex for final version
# Read tessalation
eframes = np.empty(len(gfiles), dtype=object)
for i in range(len(gfiles)):
with open(gfiles[i], "r") as f:
eframes[i] = pd.read_csv(
f,
skiprows=nels[i] + nq[i] + nw[i] + nq[i] + 11,
header=None,
nrows=nels[i],
delim_whitespace=True,
names=["type", "a", "b", "c", "d"],
)
tri = pd.concat(eframes, keys=keys)
# Replace local index with the global one
for key in keys:
nod = nodes.loc[key].copy() # make a copy of the the core
nod = nod.set_index(
"local"
) # reset the index using the local values (in essense set the index to start from 1...)
tri.loc[key, "ga"] = nod.reindex(tri.loc[key, "a"].values).values
tri.loc[key, "gb"] = nod.reindex(tri.loc[key, "b"].values).values
tri.loc[key, "gc"] = nod.reindex(tri.loc[key, "c"].values).values
tri.loc[key, "gd"] = nod.reindex(tri.loc[key, "d"].values).values
tri = tri.drop(["a", "b", "c", "d"], axis=1) # drop local references
# sort
gt3 = tri.loc[:, ["type", "ga", "gb", "gc", "gd"]].copy() # make a copy
gt3.index = gt3.index.droplevel() # drop multi-index
gt3 = gt3.reset_index(drop=True)
# Now we need to put them in order based on the global index in elems
gt3.index = elems.global_n.values # we set the index equal to the global_n column
gt3 = gt3.sort_index() # sort them
# add nan column in place of the fourth node. NOTE: This needs to be tested for quadrilaterals
# gt3['gd']=np.nan
gt3 = gt3.reset_index() # reset to add more columns without problems
## Add mean x, y of the elememts. To be used in the output
gt3["x1"] = grd.loc[gt3["ga"].values - 1, "lon"].values # lon of the index, -1 for python convention
gt3["y1"] = grd.loc[gt3["ga"].values - 1, "lat"].values # lat of the index
gt3["x2"] = grd.loc[gt3["gb"].values - 1, "lon"].values
gt3["y2"] = grd.loc[gt3["gb"].values - 1, "lat"].values
gt3["x3"] = grd.loc[gt3["gc"].values - 1, "lon"].values
gt3["y3"] = grd.loc[gt3["gc"].values - 1, "lat"].values
gt3["xc"] = gt3[["x1", "x2", "x3"]].mean(axis=1) # mean lon of the element
gt3["yc"] = gt3[["y1", "y2", "y3"]].mean(axis=1)
# take care of quads
gt3["x4"] = np.nan
gt3["y4"] = np.nan
ide = gt3.loc[gt3.type == 4].index.values # get the index of finite values
gt3.loc[ide, "x4"] = grd.loc[gt3.loc[ide, "gd"].values - 1, "lon"].values
gt3.loc[ide, "y4"] = grd.loc[gt3.loc[ide, "gd"].values - 1, "lat"].values
gt3.loc[ide, "xc"] = gt3[["x1", "x2", "x3", "x4"]].mean(axis=1) # mean lon of the element
gt3.loc[ide, "yc"] = gt3[["y1", "y2", "y3", "y4"]].mean(axis=1)
## min kbe
gt3["kbe4"] = np.nan # for quads
gt3["kbe1"] = grd.loc[gt3["ga"] - 1, "kbp00"].values
gt3["kbe2"] = grd.loc[gt3["gb"] - 1, "kbp00"].values
gt3["kbe3"] = grd.loc[gt3["gc"] - 1, "kbp00"].values
gt3.loc[ide, "kbe4"] = grd.loc[gt3.loc[ide, "gd"].values - 1, "kbp00"].values
gt3["kbe"] = gt3[["kbe1", "kbe2", "kbe3"]].min(axis=1)
gt3.loc[ide, "kbe"] = gt3[["kbe1", "kbe2", "kbe3", "kbe4"]].min(axis=1)
self.misc.update({"gt3": gt3.set_index("index")}) # set index back
# Droping duplicates
self.misc.update({"melems": elems.loc[elems.global_n.drop_duplicates().index]}) # create the retaining mask
self.misc.update({"msides": re.loc[re.global_n.drop_duplicates().index]}) # keep only one of the duplicates
self.misc.update(
{"mnodes": nodes.loc[nodes.global_n.drop_duplicates().index]}
) # keep only one of the duplicates
def hotstart(self, it=None, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
if not "melems" in self.misc:
self.global2local(**kwargs)
hfiles = glob.glob(path + "outputs/hotstart_*_{}.nc".format(it))
hfiles.sort()
# store them in a list
out = []
for i in range(len(hfiles)):
out.append(xr.open_dataset(hfiles[i]))
# Create dataset
side = []
node = []
el = []
one = []
for key in out[0].variables:
if "nResident_side" in out[0][key].dims:
r = self.combine_(key, out, self.misc["msides"], "nResident_side")
side.append(r)
elif "nResident_node" in out[0][key].dims:
r = self.combine_(key, out, self.misc["mnodes"], "nResident_node")
node.append(r)
elif "nResident_elem" in out[0][key].dims:
r = self.combine_(key, out, self.misc["melems"], "nResident_elem")
el.append(r)
elif len(out[0][key].dims) == 1:
one.append(out[0][key])
side = xr.merge(side).rename({"nResident_side": "side"})
el = xr.merge(el).rename({"nResident_elem": "elem"})
node = xr.merge(node).rename({"nResident_node": "node"})
one = xr.merge(one).rename({"one": "one_new", "it": "iths"})
# merge
xdat = xr.merge([side, el, node, one])
hfile = "hotstart_it={}.nc".format(xdat.iths.values[0])
logger.info("saving hotstart file\n")
xdat.to_netcdf(path + "outputs/{}".format(hfile))
## Any variable
def combine(self, out, g2l, name):
keys = g2l.index.get_level_values(0).unique()
r = []
for i in range(len(out)):
v = out[i].to_pandas()
v.index += 1
mask = g2l.loc[keys[i], "local"]
vm = v.loc[mask]
vm.index = g2l.loc[keys[i], "global_n"].values
r.append(vm)
r = pd.concat(r).sort_index()
r.index -= 1
r.index.name = name
return r
def combine_(self, var, out, g2l, name):
if len(out[0][var].shape) == 3:
dd = []
for i in range(out[0][var].shape[2]):
o = []
for j in range(len(out)):
o.append(out[j][var].loc[{out[j][var].dims[2]: i}])
r = self.combine(o, g2l, name)
dd.append(xr.DataArray(r.values, dims=out[0][var].dims[:2], name=var))
tr = xr.concat(dd, dim=out[0][var].dims[2])
a, b, c = out[0][var].dims
tr = tr.transpose(a, b, c)
return tr
elif len(out[0][var].shape) < 3:
o = []
for j in range(len(out)):
o.append(out[j][var])
r = self.combine(o, g2l, name)
return xr.DataArray(r.values, dims=list(out[0][var].dims), name=var)
def xcombine(self, tfs):
# Create dataset
side = []
node = []
el = []
single = []
for key in tfs[0].variables:
if "nSCHISM_hgrid_face" in tfs[0][key].dims:
r = self.combine_(key, tfs, self.misc["melems"], "nSCHISM_hgrid_face")
el.append(r)
elif "nSCHISM_hgrid_node" in tfs[0][key].dims:
r = self.combine_(key, tfs, self.misc["mnodes"], "nSCHISM_hgrid_node")
node.append(r)
elif "nSCHISM_hgrid_edge" in tfs[0][key].dims:
r = self.combine_(key, tfs, self.misc["msides"], "nSCHISM_hgrid_edge")
side.append(r)
elif len(tfs[0][key].dims) == 1:
single.append(tfs[0][key])
side = xr.merge(side)
el = xr.merge(el)
node = xr.merge(node)
single = xr.merge(single)
# merge
return xr.merge([side, el, node, single])
def tcombine(self, hfiles, sdate, times):
xall = []
for k in range(times.shape[0]):
tfs = []
for i in range(len(hfiles)):
tfs.append(xr.open_dataset(hfiles[i]).isel(time=k))
xall.append(self.xcombine(tfs))
xdat = xr.concat(xall, dim="time")
xdat = xdat.assign_coords(time=times)
return xdat
# https://stackoverflow.com/questions/41164630/pythonic-way-of-removing-reversed-duplicates-in-list
@staticmethod
def remove_reversed_duplicates(iterable):
# Create a set for already seen elements
seen = set()
for item in iterable:
# Lists are mutable so we need tuples for the set-operations.
tup = tuple(item)
if tup not in seen:
# If the tuple is not in the set append it in REVERSED order.
seen.add(tup[::-1])
# If you also want to remove normal duplicates uncomment the next line
# seen.add(tup)
yield item
def read_vgrid(self, **kwargs):
logger.info("Read vgrid.in\n")
path = get_value(self, kwargs, "rpath", "./schism/")
vgrid = pd.read_csv(
path + "vgrid.in",
header=None,
index_col=False,
engine="python",
delimiter="!",
)
try:
vgrid = vgrid.drop(1, axis=1)
except:
pass
self.misc.update({"ivcor": int(vgrid.iloc[0][0])})
[Nz, kz, hs] = vgrid.iloc[1].str.split()[0]
self.misc.update({"Nz": int(Nz)})
self.misc.update({"kz": int(kz)})
self.misc.update({"hs": float(hs)})
zlevels = vgrid.iloc[3 : 3 + self.misc["kz"], 0].str.split(n=2, expand=True)
zlevels.columns = ["level_index", "z-coordinates"]
zlevels.set_index("level_index", inplace=True)
self.misc.update({"zlevels": zlevels})
constants_index = 3 + self.misc["kz"] + 1
[h_c, theta_b, theta_f] = vgrid.iloc[constants_index].str.split()[0]
self.misc.update({"h_c": float(h_c)})
self.misc.update({"theta_b": float(theta_b)})
self.misc.update({"theta_f": float(theta_f)})
sl_index0 = constants_index + 1
sl_index1 = sl_index0 + self.misc["Nz"] - self.misc["kz"] + 1
slevels = vgrid.iloc[sl_index0:sl_index1, 0].str.split(n=2, expand=True)
slevels.columns = ["level_index", "s-coordinates"]
slevels.set_index("level_index", inplace=True)
self.misc.update({"slevels": slevels})
def results(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
if len(self.misc) == 0:
logger.info("retrieving index references ... \n")
self.global2local(**kwargs)
logger.info("... done \n")
# Create mesh xarray Dataset
grd = self.misc["grd"]
gt3 = self.misc["gt3"]
# node based variables
grd.kbp00 = grd.kbp00.astype(int)
xnodes = grd.to_xarray().rename(
{
"lon": "SCHISM_hgrid_node_x",
"lat": "SCHISM_hgrid_node_y",
"kbp00": "node_bottom_index",
"index": "nSCHISM_hgrid_node",
}
)
xnodes = xnodes.drop_vars("nSCHISM_hgrid_node")
# element based variables
gt34 = gt3.loc[:, ["ga", "gb", "gc", "gd"]].values # SCHISM_hgrid_face_nodes
xelems = xr.Dataset(
{
"SCHISM_hgrid_face_nodes": (
["nSCHISM_hgrid_face", "nMaxSCHISM_hgrid_face_nodes"],
gt34 - 1,
), # -> start index = 0
"SCHISM_hgrid_face_x": (
["nSCHISM_hgrid_face"],
gt3.loc[:, "xc"].values,
),
"SCHISM_hgrid_face_y": (
["nSCHISM_hgrid_face"],
gt3.loc[:, "yc"].values,
),
"ele_bottom_index": (["nSCHISM_hgrid_face"], gt3.kbe.values),
}
)
logger.info("done with node based variables \n")
# edge based variables
sides = []
for [etype, ga, gb, gc, gd] in gt3.loc[:, ["type", "ga", "gb", "gc", "gd"]].values:
if etype == 3:
sides.append([gb, gc])
sides.append([gc, ga])
sides.append([ga, gb])
elif etype == 4:
sides.append([gb, gc])
sides.append([gc, gd])
sides.append([gd, ga])
sides.append([ga, gb])
# removing duplicates
sides = list(self.remove_reversed_duplicates(sides))
ed = pd.DataFrame(sides, columns=["node1", "node2"])
# mean x, y
ed["x1"] = grd.loc[ed["node1"].values - 1, "lon"].values # lon of the index, -1 for python convention
ed["y1"] = grd.loc[ed["node1"].values - 1, "lat"].values # lat of the index
ed["x2"] = grd.loc[ed["node2"].values - 1, "lon"].values
ed["y2"] = grd.loc[ed["node2"].values - 1, "lat"].values
ed["xc"] = ed[["x1", "x2"]].mean(axis=1) # mean of the edge index
ed["yc"] = ed[["y1", "y2"]].mean(axis=1)
## min bottom index
ed["kbs1"] = grd.loc[ed["node1"] - 1, "kbp00"].values
ed["kbs2"] = grd.loc[ed["node2"] - 1, "kbp00"].values
ed["kbs"] = ed[["kbs1", "kbs2"]].min(axis=1)
xsides = xr.Dataset(
{
"SCHISM_hgrid_edge_nodes": (
["nSCHISM_hgrid_edge", "two"],
[[x - 1, y - 1] for [x, y] in sides],
), # index from 0
"SCHISM_hgrid_edge_x": (["nSCHISM_hgrid_edge"], ed["xc"].values),
"SCHISM_hgrid_edge_y": (["nSCHISM_hgrid_edge"], ed["yc"].values),
"edge_bottom_index": (["nSCHISM_hgrid_edge"], ed.kbs.values),
}
)
logger.info("done with side based variables \n")
# General properties
header2 = self.misc["header"].apply(pd.to_numeric)
nlist = [
"start_year",
"start_month",
"start_day",
"start_hour",
"utc_start",
"dtout",
"nspool",
"nvrt",
"kz",
"ics",
]
header2[nlist] = header2[nlist].astype(int)
sigmas = [x for x in header2.columns if "sigma" in x]
sigms = header2.loc[:, sigmas].values.flatten() # get sigmas
iwet_dry = 0 # defined by the user
ihgrid_id = -2147483647 # defined by user - 0,dummy_dim,ihgrid_id
one = xr.Dataset(
{
"dry_value_flag": (("one"), [iwet_dry]),
"SCHISM_hgrid": (("one"), [ihgrid_id]),
}
)
# compute cs
klev = np.arange(header2.kz.values[0], header2.nvrt.values[0] + 1)
k = klev - header2.kz.values
cs = np.zeros(k)
cs = (1 - header2.theta_b.values) * np.sinh(header2.theta_f.values * sigms[k]) / np.sinh(
header2.theta_f.values
) + header2.theta_b.values * (
np.tanh(header2.theta_f.values * (sigms[k] + 0.5)) - np.tanh(header2.theta_f.values * 0.5)
) / 2 / np.tanh(
header2.theta_f.values * 0.5
)
Cs = xr.Dataset({"Cs": (("sigma"), cs)}, coords={"sigma": sigms})
header_list = ["ics", "h0", "h_c", "theta_b", "theta_f", "h_s"]
gen = header2[header_list].to_xarray()
gen = gen.rename({"index": "one"})
# merge
gen = xr.merge([gen, Cs, one])
gen = gen.rename(
{
"ics": "coordinate_system_flag",
"h0": "minimum_depth",
"h_c": "sigma_h_c",
"theta_b": "sigma_theta_b",
"theta_f": "sigma_theta_f",
"h_s": "sigma_maxdepth",
}
)
gen = gen.drop_vars("one")
# set timestamp
date = header2.loc[:, ["start_year", "start_month", "start_day", "start_hour", "utc_start"]]
date = date.astype(int)
date.columns = ["year", "month", "day", "hour", "utc"] # rename the columns
# set the start timestamp
sdate = pd.Timestamp(
year=date.year.values[0],
month=date.month.values[0],
day=date.day.values[0],
hour=date.hour.values[0],
tz=date.utc.values[0],
)
logger.info("done with generic variables \n")
# Read Netcdf output files
hfiles = glob.glob(path + "outputs/schout_*_*.nc")
irange_ = [int(x.split("_")[-1].split(".")[0]) for x in hfiles]
irange_ = np.unique(irange_)
irange = get_value(self, kwargs, "rlist", irange_)
self.read_vgrid() # read grid attributes
logger.info("Write combined NetCDF files \n")
# total_xdat = []
for val in tqdm(irange):
hfiles = glob.glob(path + "outputs/schout_*_{}.nc".format(val))
hfiles.sort()
times = xr.open_dataset(hfiles[0]).time
times = pd.to_datetime(times.values, unit="s", origin=sdate.tz_convert(None))
if times.size == 0:
continue
idat = self.tcombine(hfiles, sdate, times)
# MERGE
xc = xr.merge([idat, gen, xnodes, xelems, xsides])
# Choose attrs
if header2.ics.values == 1:
lat_coord_standard_name = "projection_y_coordinate"
lon_coord_standard_name = "projection_x_coordinate"
x_units = "m"
y_units = "m"
lat_str_len = 23
lon_str_len = 23
else:
lat_coord_standard_name = "latitude"
lon_coord_standard_name = "longitude"
x_units = "degrees_east"
y_units = "degrees_north"
lat_str_len = 8
lon_str_len = 9
# set Attrs
xc.SCHISM_hgrid_node_x.attrs = {
"long_name": "node x-coordinate",
"standard_name": lon_coord_standard_name,
"units": x_units,
"mesh": "SCHISM_hgrid",
}
xc.SCHISM_hgrid_node_y.attrs = {
"long_name": "node y-coordinate",
"standard_name": lat_coord_standard_name,
"units": y_units,
"mesh": "SCHISM_hgrid",
}
xc.depth.attrs = {
"long_name": "Bathymetry",
"units": "meters",
"positive": "down",
"mesh": "SCHISM_hgrid",
"location": "node",
}
xc.sigma_h_c.attrs = {
"long_name": "ocean_s_coordinate h_c constant",
"units": "meters",
"positive": "down",
}
xc.sigma_theta_b.attrs = {"long_name": "ocean_s_coordinate theta_b constant"}
xc.sigma_theta_f.attrs = {"long_name": "ocean_s_coordinate theta_f constant"}
xc.sigma_maxdepth.attrs = {
"long_name": "ocean_s_coordinate maximum depth cutoff (mixed s over z boundary)",
"units": "meters",
"positive": "down",
}
xc.Cs.attrs = {
"long_name": "Function C(s) at whole levels",
"positive": "up",
}
xc.dry_value_flag.attrs = {"values": "0: use last-wet value; 1: use junk"}
xc.SCHISM_hgrid_face_nodes.attrs = {
"long_name": "Horizontal Element Table",
"cf_role": "face_node_connectivity",
"start_index": 0,
}
xc.SCHISM_hgrid_edge_nodes.attrs = {
"long_name": "Map every edge to the two nodes that it connects",
"cf_role": "edge_node_connectivity",
"start_index": 0,
}
xc.SCHISM_hgrid_edge_x.attrs = {
"long_name": "x_coordinate of 2D mesh edge",
"standard_name": lon_coord_standard_name,
"units": x_units,
"mesh": "SCHISM_hgrid",
}
xc.SCHISM_hgrid_edge_y.attrs = {
"long_name": "y_coordinate of 2D mesh edge",
"standard_name": lat_coord_standard_name,
"units": y_units,
"mesh": "SCHISM_hgrid",
}
xc.SCHISM_hgrid_face_x.attrs = {
"long_name": "x_coordinate of 2D mesh face",
"standard_name": lon_coord_standard_name,
"units": x_units,
"mesh": "SCHISM_hgrid",
}
xc.SCHISM_hgrid_face_y.attrs = {
"long_name": "y_coordinate of 2D mesh face",
"standard_name": lat_coord_standard_name,
"units": y_units,
"mesh": "SCHISM_hgrid",
}
xc.SCHISM_hgrid.attrs = {
"long_name": "Topology data of 2d unstructured mesh",
"topology_dimension": 2,
"cf_role": "mesh_topology",
"node_coordinates": "SCHISM_hgrid_node_x SCHISM_hgrid_node_y",
"face_node_connectivity": "SCHISM_hgrid_face_nodes",
"edge_coordinates": "SCHISM_hgrid_edge_x SCHISM_hgrid_edge_y",
"face_coordinates": "SCHISM_hgrid_face_x SCHISM_hgrid_face_y",
"edge_node_connectivity": "SCHISM_hgrid_edge_nodes",
}
xc.node_bottom_index.attrs = {
"long_name": "bottom level index at each node",
"units": "non-dimensional",
"mesh": "SCHISM_hgrid",
"location": "node",
"start_index": 0,
}
xc.ele_bottom_index.attrs = {
"long_name": "bottom level index at each element",
"units": "non-dimensional",
"mesh": "SCHISM_hgrid",
"location": "elem",
"start_index": 0,
}
xc.edge_bottom_index.attrs = {
"long_name": "bottom level index at each edge",
"units": "non-dimensional",
"mesh": "SCHISM_hgrid",
"location": "edge",
"start_index": 0,
}
base_date = " ".join([str(x) for x in date.T.values.flatten()])
xc.time.attrs = {
"long_name": "Time",
"base_date": base_date,
"standard_name": "time",
}
xc.sigma.attrs = {
"long_name": "S coordinates at whole levels",
"units": "1",
"standard_name": "ocean_s_coordinate",
"positive": "up",
"h_s": self.misc["hs"],
"h_c": self.misc["h_c"],
"theta_b": self.misc["theta_b"],
"theta_f": self.misc["theta_f"],
"formula_terms": "s: sigma eta: elev depth: depth a: sigma_theta_f b: sigma_theta_b depth_c: sigma_h_c",
}
# Dataset Attrs
xc.attrs = {
"Conventions": "CF-1.0, UGRID-1.0",
"title": "SCHISM Model output",
"source": "SCHISM model output version v10",
"references": "http://ccrm.vims.edu/schismweb/",
"history": "created by pyposeidon",
"comment": "SCHISM Model output",
"type": "SCHISM Model output",
"VisIT_plugin": "https://schism.water.ca.gov/library/-/document_library/view/3476283",
}
xc.to_netcdf(path + "outputs/schout_{}.nc".format(val))
logger.info("done with output netCDF files \n")
def set_obs(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
nspool_sta = get_value(self, kwargs, "nspool_sta", 1)
tg_database = get_value(self, kwargs, "obs", DATA_PATH + "critech.csv")
coastal_monitoring = get_value(self, kwargs, "coastal_monitoring", False)
flags = get_value(self, kwargs, "station_flags", [1] + [0] * 8)
station_flag = pd.DataFrame(
{
"elev": flags[0],
"air_pressure": flags[1],
"windx": flags[2],
"windy": flags[3],
"T": flags[4],
"S": flags[5],
"u": flags[6],
"v": flags[7],
"w": flags[8],
},
index=[0],
)
## FOR TIDE GAUGE MONITORING
z = self.__dict__.copy()
tg = obs(**z)
self.obs = tg
logger.info("get in-situ measurements locations \n")
gpoints = np.array(
list(
zip(
self.mesh.Dataset.SCHISM_hgrid_node_x.values,
self.mesh.Dataset.SCHISM_hgrid_node_y.values,
)
)
)
stations = []
mesh_index = []
for l in range(tg.locations.shape[0]):
plat, plon = tg.locations.loc[l, ["latitude", "longitude"]]
cp = closest_node([plon, plat], gpoints)
mesh_index.append(
list(
zip(
self.mesh.Dataset.SCHISM_hgrid_node_x.values,
self.mesh.Dataset.SCHISM_hgrid_node_y.values,
)
).index(tuple(cp))
)
stations.append(cp)
if stations == []:
logger.warning("no observations available\n")
# to df
stations = pd.DataFrame(stations, columns=["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y"])
stations["z"] = 0
stations.index += 1
stations["gindex"] = mesh_index
stations["name"] = tg.locations.Name.values
stations["id"] = tg.locations.ID.values
stations["group"] = tg.locations.Group.values
stations["longitude"] = tg.locations.longitude.values
stations["latitude"] = tg.locations.latitude.values
if coastal_monitoring:
## FOR COASTAL MONITORING
logger.info("set land boundaries as observation points \n")
# get land boundaries
coasts = [key for key in self.mesh.Dataset.variables if "land_boundary" in key]
# get index
bnodes = []
for i in range(len(coasts)):
bnodes = bnodes + list(self.mesh.Dataset[coasts[i]].dropna("index").astype(int).values)
bnodes = np.unique(bnodes) # keep unique values
# get x,y
xx = self.mesh.Dataset.SCHISM_hgrid_node_x.to_dataframe().loc[bnodes]
yy = self.mesh.Dataset.SCHISM_hgrid_node_y.to_dataframe().loc[bnodes]
# put them in dataframe
coastal_stations = pd.concat([xx, yy], axis=1, sort=False)
coastal_stations["z"] = 0
coastal_stations.reset_index(inplace=True, drop=True)
coastal_stations.index += 1
coastal_stations["gindex"] = bnodes
coastal_stations.head()
# append
stations = pd.concat([stations, coastal_stations])
stations.reset_index(inplace=True, drop=True)
stations.index += 1
stations["gindex"] = stations["gindex"].astype(int)
# modify config paramater
self.params["SCHOUT"]["iout_sta"] = 1
self.params["SCHOUT"]["nspool_sta"] = nspool_sta
self.params.write(path + "param.nml", force=True)
self.stations = stations
logger.info("write out stations.in file \n")
# output to file
with open(path + "station.in", "w") as f:
station_flag.to_csv(f, header=None, index=False, sep=" ")
f.write("{}\n".format(stations.shape[0]))
stations.loc[:, ["SCHISM_hgrid_node_x", "SCHISM_hgrid_node_y", "z"]].to_csv(f, header=None, sep=" ")
def get_station_data(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./schism/")
# locate the station files
sfiles = glob.glob(path + "outputs/staout_*")
sfiles.sort()
try:
# get the station flags
flags = pd.read_csv(path + "station.in", header=None, nrows=1, delim_whitespace=True).T
flags.columns = ["flag"]
flags["variable"] = [
"elev",
"air_pressure",
"windx",
"windy",
"T",
"S",
"u",
"v",
"w",
]
vals = flags[flags.values == 1] # get the active ones
except OSError as e:
if e.errno == errno.EEXIST:
logger.error("No station.in file present")
return
dstamp = kwargs.get("dstamp", self.date)
dfs = []
for idx in vals.index:
obs = np.loadtxt(sfiles[idx])
df = pd.DataFrame(obs)
df = df.set_index(0)
df.index.name = "time"
df.columns.name = vals.loc[idx, "variable"]
df.index = pd.to_datetime(dstamp) + pd.to_timedelta(df.index, unit="S")
pindex = pd.MultiIndex.from_product([df.T.columns, df.T.index])
r = pd.DataFrame(df.values.flatten(), index=pindex, columns=[vals.loc[idx, "variable"]])
r.index.names = ["time", "node"]
r.index = r.index.set_levels(r.index.levels[1] - 1, level=1)
dfs.append(r.to_xarray())
self.time_series = xr.combine_by_coords(dfs)
def get_output_data(self, **kwargs):
dic = self.__dict__
dic.update(kwargs)
self.data = data.get_output(**dic)
def open_thalassa(self, **kwargs):
# open a Thalassa instance to visualize the output
return
__init__(self, **kwargs)
special
¶
Create a Schism solver
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rfolder |
str |
The path to a directory containing the results of a Model solved by Schism. |
required |
geometry |
Union[dict, str, GeoDataFrame] |
A |
required |
load_mesh |
bool |
Flag indicating whether to load the mesh or not. Defaults to |
required |
load_meteo |
bool |
Flag indicating whether to load the meteo data or not. Defauls to
|
required |
coastlines |
Union[str, GeoDataFrame] |
A |
required |
tag |
str |
The model's "tag". Defaults to |
required |
tide |
str |
Flag indicating whether to load "tide". Defaults to |
required |
atm |
bool |
The solver's atm. Defaults to |
required |
monitor |
bool |
The solver's monitor. Defaults to |
required |
epath |
str |
The path to the schism executable. If the |
required |
start_date |
str |
The date from which the analysis should start. It should be a string parseable
by |
required |
end_date |
str |
The date at which the analysis should end. It should be a string parseable by
|
required |
time_frame |
str |
The duration of the analysis. It should be a string parseable by
|
required |
date |
str |
Reference date of the run. |
required |
rpath |
str |
Path for output of the model. Defaults to |
required |
m_index |
int |
Define the index of the meteo Dataset. Defaults to |
required |
filename |
str |
Path to output the meteo Dataset. Defaults to |
required |
dstamp |
str |
Reference date for station data. Defaults to date. |
required |
parameters |
dict |
Overwrite default Schism's parameter values. |
required |
meteo_source |
str |
Path or url to meteo data. |
required |
dem_source |
str |
Path or url to bathymetric data. |
required |
update |
list[str] |
Control the update of the model e.g |
required |
meteo_split_by |
str |
Split the meteo Dataset to multiple files by e.g. |
required |
manning_file |
str |
Path to manning file. |
required |
manning |
float |
Set constant value in the manning.gr3 file. Defaults to |
required |
windrot_file |
str |
Path to windrot file. |
required |
windrot |
float |
Set constant value in the windrot_geo2proj.gr3 file. Defaults to |
required |
station_flags |
list[int] |
Define the flag for station output. Defaults to |
required |
coastal_monitoring |
bool |
Flag for setting all land/island boundary nodes as stations.
Defaults to |
required |
obs |
str |
Path to csv file for station locations. Defaults to |
required |
nspool_sta |
int |
Related to station nodes setup. Defaults to |
required |
Source code in pyposeidon/schism.py
def __init__(self, **kwargs):
"""
Create a Schism solver
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
rfolder str: The path to a directory containing the results of a Model solved by Schism.
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
load_mesh bool: Flag indicating whether to load the mesh or not. Defaults to `False`.
load_meteo bool: Flag indicating whether to load the meteo data or not. Defauls to
`False`.
coastlines Union[str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile which
describes the coastlines.
tag str: The model's "tag". Defaults to `"schism"`.
tide str: Flag indicating whether to load "tide". Defaults to `False`.
atm bool: The solver's atm. Defaults to `True`.
monitor bool: The solver's monitor. Defaults to `False`.
epath str: The path to the schism executable. If the `SCHISM` env variable has been
set, then it overrides the value passed as the parameter.
start_date str: The date from which the analysis should start. It should be a string parseable
by `pd.to_datetime()`.
end_date str: The date at which the analysis should end. It should be a string parseable by
`pd.to_datetime()`.
time_frame str: The duration of the analysis. It should be a string parseable by
`pd.to_datetime()`.
date str: Reference date of the run.
rpath str: Path for output of the model. Defaults to `./schism/`.
m_index int: Define the index of the meteo Dataset. Defaults to `1`.
filename str: Path to output the meteo Dataset. Defaults to `sflux/`.
dstamp str: Reference date for station data. Defaults to date.
parameters dict: Overwrite default Schism's parameter values.
meteo_source str: Path or url to meteo data.
dem_source str: Path or url to bathymetric data.
update list[str]: Control the update of the model e.g `['dem']`-> updates only bathymetry.
Defaults to `["all"]`.
meteo_split_by str: Split the meteo Dataset to multiple files by e.g. `"day"`.
Defaults to `None`.
manning_file str: Path to manning file.
manning float: Set constant value in the manning.gr3 file. Defaults to `0.12`.
windrot_file str: Path to windrot file.
windrot float: Set constant value in the windrot_geo2proj.gr3 file. Defaults to `0.00001`.
station_flags list[int]: Define the flag for station output. Defaults to `[1,0,0,0,0,0,0,0,0]`.
coastal_monitoring bool: Flag for setting all land/island boundary nodes as stations.
Defaults to `False`.
obs str: Path to csv file for station locations. Defaults to `misc/critech.csv`.
nspool_sta int: Related to station nodes setup. Defaults to `1`.
"""
rfolder = kwargs.get("rfolder", None)
if rfolder:
self.read_folder(**kwargs)
self.geometry = kwargs.get("geometry", None)
if self.geometry:
if isinstance(self.geometry, dict):
self.lon_min = self.geometry["lon_min"]
self.lon_max = self.geometry["lon_max"]
self.lat_min = self.geometry["lat_min"]
self.lat_max = self.geometry["lat_max"]
elif isinstance(self.geometry, str):
try:
geo = gp.GeoDataFrame.from_file(self.geometry)
except:
logger.error("geometry argument not a valid geopandas file")
sys.exit(1)
(
self.lon_min,
self.lat_min,
self.lon_max,
self.lat_max,
) = geo.total_bounds
# coastlines
coastlines = kwargs.get("coastlines", None)
if coastlines is not None:
try:
coast = gp.GeoDataFrame.from_file(coastlines)
except:
coast = gp.GeoDataFrame(coastlines)
self.coastlines = coast
if not hasattr(self, "start_date"):
start_date = kwargs.get("start_date", None)
self.start_date = pd.to_datetime(start_date)
if not hasattr(self, "end_date"):
if "time_frame" in kwargs:
time_frame = kwargs.get("time_frame", None)
self.end_date = self.start_date + pd.to_timedelta(time_frame)
elif "end_date" in kwargs:
end_date = kwargs.get("end_date", None)
self.end_date = pd.to_datetime(end_date)
self.time_frame = self.end_date - self.start_date
if not hasattr(self, "date"):
self.date = get_value(self, kwargs, "date", self.start_date)
if not hasattr(self, "end_date"):
# ---------------------------------------------------------------------
logger.warning("model not set properly, No end_date\n")
# ---------------------------------------------------------------------
self.tag = kwargs.get("tag", "schism")
self.tide = kwargs.get("tide", False)
self.atm = kwargs.get("atm", True)
self.monitor = kwargs.get("monitor", False)
try:
self.epath = os.environ["SCHISM"]
except:
self.epath = kwargs.get("epath", None)
self.solver_name = SCHISM_NAME
for attr, value in kwargs.items():
if not hasattr(self, attr):
setattr(self, attr, value)
setattr(self, "misc", {})
pyposeidon.d3d
¶
Main d3d module of pyposeidon. It controls the creation, output & execution of a complete simulation based on DELFT3D.
d3d
¶
Source code in pyposeidon/d3d.py
class d3d:
def __init__(self, **kwargs):
"""
Create a D3D solver
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
start_date str: The date from which the analysis should start. It should be a string parseable
by `pd.to_datetime()`.
end_date str: The date at which the analysis should end. It should be a string parseable by
`pd.to_datetime()`.
time_frame str: The duration of the analysis. It should be a string parseable by
`pd.to_datetime()`.
date str: Reference date of the run.
meteo_source str: Path or url to meteo data.
dem_source str: Path or url to bathymetric data.
argfile str: Path to `_hydro.xml` file.
update str: Control the update of the model e.g `['dem']`-> updates only bathymetry.
Defaults to `["all"]`.
rpath str: Path for output of the model. Defaults to `./d3d/`.
tide str: Flag indicating whether to load "tide". Defaults to `False`.
atm bool: The solver's atm. Defaults to `True`.
tag str: The model's "tag". Defaults to `"d3d"`.
resolution float: size of the regular grid. Defaults to `0.1`.
ofilename str: Path to station file. Defaults to `None`.
epath str: The path to the schism executable. If the `D3D` env variable has been
set, then it overrides the value passed as the parameter.
config_file str: Path to mdf file. Defaults to `None`.
config dict: Parameter options passed to mdf file.
output bool: Flag for saving to file. Defaults to `False`.
update list[str]: Control the update of the model e.g `['dem']`-> updates only bathymetry.
Defaults to `["all"]`.
"""
self.geometry = kwargs.get("geometry", None)
if self.geometry:
if isinstance(self.geometry, dict):
self.lon_min = self.geometry["lon_min"]
self.lon_max = self.geometry["lon_max"]
self.lat_min = self.geometry["lat_min"]
self.lat_max = self.geometry["lat_max"]
elif isinstance(self.geometry, str):
try:
geo = gp.GeoDataFrame.from_file(self.geometry)
except:
logger.error("geometry argument not a valid geopandas file")
sys.exit(1)
(
self.lon_min,
self.lat_min,
self.lon_max,
self.lat_max,
) = geo.total_bounds
start_date = kwargs.get("start_date", None)
self.start_date = pd.to_datetime(start_date)
if "time_frame" in kwargs:
time_frame = kwargs.get("time_frame", None)
self.end_date = self.start_date + pd.to_timedelta(time_frame)
self.time_frame = time_frame
elif "end_date" in kwargs:
end_date = kwargs.get("end_date", None)
self.end_date = pd.to_datetime(end_date)
self.time_frame = self.end_date - self.start_date
if not hasattr(self, "date"):
self.date = self.start_date
if not hasattr(self, "end_date"):
# ---------------------------------------------------------------------
logger.warning("model not set properly, No end_date\n")
# ---------------------------------------------------------------------
self.tag = kwargs.get("tag", "d3d")
self.resolution = kwargs.get("resolution", 0.1)
self.irange = kwargs.get("irange", [0, -1, 1])
self.tide = kwargs.get("tide", False)
self.atm = kwargs.get("atm", True)
self.ofilename = kwargs.get("ofilename", None)
self.solver_name = D3D_NAME
try:
self.epath = os.environ["D3D"]
except:
self.epath = kwargs.get("epath", None)
for attr, value in kwargs.items():
if not hasattr(self, attr):
setattr(self, attr, value)
# ============================================================================================
# CONFIG
# ============================================================================================
def config(self, **kwargs):
mdf_file = kwargs.get("config_file", None)
dic = get_value(self, kwargs, "parameters", None)
if mdf_file:
self.mdf = pd.read_csv(mdf_file, sep="=")
else:
self.mdf = pd.read_csv(DATA_PATH + "default.mdf", sep="=")
self.mdf = self.mdf.set_index(self.mdf.columns[0]) # set index
mdfidx = self.mdf.index.str.strip() # store the stripped names
# define mesh file
self.mdf.loc[self.mdf.index.str.contains("Filcco")] = "#{}#".format(self.tag + ".grd")
# define enc file
self.mdf.loc[self.mdf.index.str.contains("Filgrd")] = "#{}#".format(self.tag + ".enc")
# define dep file
self.mdf.loc[self.mdf.index.str.contains("Fildep")] = "#{}#".format(self.tag + ".dep")
# define obs file
if self.ofilename:
self.mdf.loc[self.mdf.index.str.contains("Filsta")] = "#{}#".format(self.tag + ".obs")
else:
self.mdf.loc[self.mdf.index.str.contains("Filsta")] = "##"
# adjust ni,nj
nj, ni = self.nj, self.ni
self.mdf.loc[self.mdf.index.str.contains("MNKmax")] = "{} {} {}".format(ni + 1, nj + 1, 1) # add one like ddb
# adjust iteration date
self.mdf.loc[self.mdf.index.str.contains("Itdate")] = "#{}#".format(self.date.strftime(format="%Y-%m-%d"))
# set time unit
self.mdf.loc[self.mdf.index.str.contains("Tunit")] = "#M#"
# adjust iteration start
Tstart = self.start_date.hour * 60
self.mdf.loc[self.mdf.index.str.contains("Tstart")] = Tstart
# adjust iteration stop
Tstop = Tstart + int(pd.to_timedelta(self.time_frame).total_seconds() / 60)
self.mdf.loc[self.mdf.index.str.contains("Tstop")] = Tstop
# adjust time for output
mstep = get_value(self, kwargs, "map_step", 60)
hstep = get_value(self, kwargs, "his_step", 0)
pstep = get_value(self, kwargs, "pp_step", 0)
rstep = get_value(self, kwargs, "restart_step", 0)
if rstep == -1: # save a restart file at the end
rstep = Tstop
self.mdf.loc[self.mdf.index.str.contains("Flmap")] = "{:d} {:d} {:d}".format(Tstart, mstep, Tstop)
self.mdf.loc[self.mdf.index.str.contains("Flhis")] = "{:d} {:d} {:d}".format(Tstart, hstep, Tstop)
self.mdf.loc[self.mdf.index.str.contains("Flpp")] = "{:d} {:d} {:d}".format(Tstart, pstep, Tstop)
self.mdf.loc[self.mdf.index.str.contains("Flrst")] = rstep
# time interval to smooth the hydrodynamic boundary conditions
self.mdf.loc[self.mdf.index.str.contains("Tlfsmo")] = 0.0
if not self.atm:
self.mdf.loc["Sub1"] = " "
# set tide only run
if self.tide:
self.mdf.loc[self.mdf.index.str.contains("Filbnd")] = "#{}#".format(self.tag + ".bnd")
self.mdf.loc[self.mdf.index.str.contains("Filana")] = "#{}#".format(self.tag + ".bca")
# if 'Tidfor' not in order: order.append('Tidfor')
# inp['Tidfor']=[['M2','S2','N2','K2'], \
# ['K1','O1','P1','Q1'], \
# ['-----------']]
# specify ini file
# if 'Filic' not in order: order.append('Filic')
# inp['Filic']=basename+'.ini'
# netCDF output
if not "FlNcdf" in mdfidx:
self.mdf.reindex(self.mdf.index.values.tolist() + ["FlNcdf "])
self.mdf.loc["FlNcdf "] = "#map his#"
other = kwargs.get("config", None)
if other:
# Check for any other mdf variable in input
for key, val in other.items():
if key in mdfidx:
self.mdf.loc[self.mdf.index.str.contains(key)] = val
else:
self.mdf.loc[key] = val
output = kwargs.get("output", False)
if output:
# save mdf
path = get_value(self, kwargs, "rpath", "./d3d/")
self.mdf.to_csv(path + self.tag + ".mdf", sep="=")
# ============================================================================================
# METEO
# ============================================================================================
def force(self, **kwargs):
meteo_source = get_value(self, kwargs, "meteo_source", None)
kwargs.update({"meteo_source": meteo_source})
flag = get_value(self, kwargs, "update", [])
# check if files exist
z = {**self.__dict__, **kwargs} # merge self and possible kwargs
if flag:
if ("meteo" in flag) | ("all" in flag):
self.meteo = pmeteo.Meteo(**z)
else:
logger.info("skipping meteo files ..\n")
else:
self.meteo = pmeteo.Meteo(**z)
@staticmethod
def from_force(filename=None, name=None):
df = pd.read_csv(filename, header=0, names=["data"], index_col=None, low_memory=False)
tlines = df[df.data.str.contains("TIME")].index # rows which start with TIME
# get attrs
d1 = df.loc[0 : tlines[0] - 1, "data"].str.split("=", 2, expand=True)
d1.columns = ["key", "value"] # assign column names
d1.key = d1.key.str.strip() # cleanup spaces
d1.value = d1.value.str.strip()
attrs = dict(zip(d1.key, d1.value)) # create dict
for key in ["n_cols", "n_rows", "n_quantity"]: # str -> int
attrs[key] = int(attrs[key])
for key in ["x_llcenter", "dx", "y_llcenter", "dy", "NODATA_value"]:
attrs[key] = float(attrs[key])
# get time reference
d2 = df.loc[tlines, "data"].str.split("=", 2, expand=True)
d2 = d2.drop(d2.columns[0], axis=1)
d2.columns = ["data"]
d2 = d2.loc[:, "data"].str.split(" ", 4, expand=True)
d2 = d2.drop(d2.columns[[0, 2, 3]], axis=1)
d2.columns = ["hours", "time0"]
d2.hours = d2.hours.apply(pd.to_numeric)
d2.time0 = pd.to_datetime(d2.time0.values)
d2 = d2.reset_index(drop=True)
# create timestamps
time = []
for i in range(d2.shape[0]):
time.append(d2.time0[0] + pd.DateOffset(hours=int(d2.loc[i, "hours"])))
d2["time"] = time
# get the float numbers
d3 = df.drop(np.arange(0, tlines[0]))
d3 = d3.drop(tlines)
# data = []
# for i in range(d3.values.shape[0]):
# row = d3.values[i][0].split(' ')
# row = [np.float(x) for x in row]
# data.append(row)
# data = np.array(data) # make array
data = d3[d3.columns[0]].str.split(" ", attrs["n_cols"], expand=True).to_numpy().astype(float)
data = data.reshape(d2.shape[0], attrs["n_rows"], attrs["n_cols"]) # reshape
# define lat/lon
lon = [attrs["x_llcenter"] + attrs["dx"] * i for i in np.arange(attrs["n_cols"])]
lat = [attrs["y_llcenter"] + attrs["dy"] * i for i in np.arange(attrs["n_rows"])]
# create an xarray
da = xr.DataArray(
data,
dims=["time", "latitude", "longitude"],
coords={"time": d2.time, "latitude": lat, "longitude": lon},
name=name,
)
da.attrs = attrs
return da
@staticmethod
def to_force(ar, **kwargs):
logger.info("writing meteo files ..\n")
path = kwargs.get("rpath", "./d3d/")
[p, u, v] = kwargs.get("vars", "[None,None,None]")
curvi = kwargs.get("curvi", False)
flip = np.diff(ar.latitude.values)[0]
dlat = np.abs(flip)
dlon = np.diff(ar.longitude.values)[0]
lat0 = ar.latitude.data.min()
lon0 = ar.longitude.data.min()
nodata = -9999.000
pp = ar[p].fillna(nodata).values
uu = ar[u].fillna(nodata).values
vv = ar[v].fillna(nodata).values
if not os.path.exists(path):
os.makedirs(path)
# open files
pfid = open(path + "p.amp", "w")
ufid = open(path + "u.amu", "w")
vfid = open(path + "v.amv", "w")
fi = [pfid, ufid, vfid]
wi = [ufid, vfid]
# write file headers
for f in fi:
f.write("FileVersion = 1.03\n")
if curvi:
for f in fi:
f.write("Filetype = meteo_on_curvilinear_grid\n")
f.write("grid_file = wind.grd\n")
f.write("first_data_value = grid_ulcorner\n")
f.write("data_row = grid_row\n")
else:
for f in fi:
f.write("Filetype = meteo_on_equidistant_grid\n")
f.write("n_cols = {}\n".format(ar[u].shape[2]))
f.write("n_rows = {}\n".format(ar[u].shape[1]))
f.write("grid_unit = degree\n")
# code currently assumes lon and lat are increasing
f.write("x_llcenter = {:g}\n".format(lon0))
f.write("dx = {:g}\n".format(dlon))
f.write("y_llcenter = {:g}\n".format(lat0))
f.write("dy = {:g}\n".format(dlat))
for f in fi:
f.write("NODATA_value = {:.3f}\n".format(nodata))
f.write("n_quantity = 1\n")
ufid.write("quantity1 = x_wind\n")
vfid.write("quantity1 = y_wind\n")
pfid.write("quantity1 = air_pressure\n")
for f in wi:
f.write("unit1 = m s-1\n")
pfid.write("unit1 = Pa\n")
time0 = pd.to_datetime("2000-01-01 00:00:00")
# write time blocks
indx = ar.time.values - time0.to_datetime64()
indx = indx.astype("timedelta64[m]") / 60
for it in range(indx.size): # nt + 0 hour
for f in fi:
f.write("TIME = {} hours since 2000-01-01 00:00:00 +00:00\n".format(indx[it].astype(int)))
if flip < 0:
np.savetxt(pfid, np.flipud(pp[it, :, :]), fmt="%.8f")
np.savetxt(ufid, np.flipud(uu[it, :, :]), fmt="%.8f")
np.savetxt(vfid, np.flipud(vv[it, :, :]), fmt="%.8f")
else:
np.savetxt(pfid, pp[it, :, :], fmt="%.8f")
np.savetxt(ufid, uu[it, :, :], fmt="%.8f")
np.savetxt(vfid, vv[it, :, :], fmt="%.8f")
# close files
for f in fi:
f.close()
# ============================================================================================
# DEM
# ============================================================================================
@staticmethod
def from_dep(filename, **kwargs):
rdem = np.loadtxt(filename)
dr = xr.DataArray(rdem[:-1, :-1], name="ival", dims=["k", "l"])
return dr
def bath(self, **kwargs):
kwargs["grid_x"] = self.mesh.Dataset.lons.values
kwargs["grid_y"] = self.mesh.Dataset.lats.values
dpath = get_value(self, kwargs, "dem_source", None)
kwargs.update({"dem_source": dpath})
flag = get_value(self, kwargs, "update", [])
# check if files exist
if flag:
if ("dem" in flag) | ("all" in flag):
kwargs.update(
{
"lon_min": self.lon_min,
"lat_min": self.lat_min,
"lon_max": self.lon_max,
"lat_max": self.lat_max,
}
)
self.dem = pdem.Dem(**kwargs)
else:
logger.info("reading local dem file ..\n")
dem_source = z["rpath"] + self.tag + ".dep"
rdem = from_dep(dem_source)
else:
kwargs.update(
{
"lon_min": self.lon_min,
"lat_min": self.lat_min,
"lon_max": self.lon_max,
"lat_max": self.lat_max,
}
)
self.dem = pdem.Dem(**kwargs)
@staticmethod
def to_dep(dr, dry_mask=True, **kwargs):
# save dem
logger.info("writing dem file ..\n")
path = kwargs.get("rpath", "./d3d/")
flag = kwargs.get("update", None)
tag = kwargs.get("tag", "d3d")
try:
try:
bat = -dr.fval.values.astype(float) # reverse for the hydro run/use the adjusted values
# mask = bat==999999
except AttributeError:
bat = -dr.ival.values.astype(float) # reverse for the hydro run/revert to interpolated values
nj, ni = bat.shape
if dry_mask:
mask = ~np.isnan(bat) # mask out potential nan points
mask[mask] = np.less(bat[mask], 0) # get mask for dry points
bat[mask] = np.nan # mask dry points
# append the line/column of nodata
nodata = np.empty(ni)
nodata.fill(np.nan)
bat1 = np.vstack((bat, nodata))
nodata = np.empty((nj + 1, 1))
nodata.fill(np.nan)
bat2 = np.hstack((bat1, nodata))
bat2[np.isnan(bat2)] = -999.0
except AttributeError:
logger.warning("problem with dem Dataset ..")
# Write bathymetry file
if flag:
if ("all" in flag) or ("dem" in flag):
np.savetxt(path + tag + ".dep", bat2)
else:
logger.info("keeping dem file ..\n")
else:
np.savetxt(path + tag + ".dep", bat2)
# ============================================================================================
# BOUNDARY CONDITIONS TODO
# ============================================================================================
def bc(self, **kwargs):
# define boundaries
z = self.__dict__.copy()
z["lons"] = self.mesh.Dataset.lons[0, :]
z["lats"] = self.mesh.Dataset.lats[:, 0]
try:
ba = -self.dem.Dataset.ival.astype(float)
# ba[ba<0]=np.nan
z["dem"] = ba
z["cn"] = 10
z.update(kwargs)
self.bound = Box(**z)
except:
logger.info("boundary files not set..\n")
def to_bnd(self):
# save bnd
with open(path + self.tag + ".bnd", "w") as f:
dd = OrderedDict(
[
("North", self.bound.North),
("South", self.bound.South),
("West", self.bound.West),
("East", self.bound.East),
]
)
# for key,val in self.bound.__dict__.items():
for i, (key, val) in enumerate(dd.items()): # to match deltares
idx = 1
for k1, k2 in val:
bname = key + str(idx)
f.write(
"{0:<10s}{1:>12s}{2:>2s}{3:>6d}{4:>6d}{5:>6d}{6:>6d} 0.0000000e+00 {7:<s}{8:<g}A {9:<s}{10:<g}B\n".format(
bname,
nm[0],
nm[1],
k1[0] + 1,
k1[1] + 1,
k2[0] + 1,
k2[1] + 1,
key,
idx,
key,
idx,
)
) # fortran index ??
idx += 1
def to_bca(self):
# save bca
with open(path + self.tag + ".bca", "w") as f:
dd = OrderedDict(
[
("North", self.tide.North),
("South", self.tide.South),
("West", self.tide.West),
("East", self.tide.East),
]
)
# for key,val in self.tide.__dict__.items():
for i, (key, val) in enumerate(dd.items()): # to match deltares
idx = 1
if val:
l = np.arange(val.ampl.shape[0]) + idx
nl = [x for pair in zip(l, l) for x in pair]
sl = val.ampl.shape[0] * le
for t1, t2, amp, phase in zip(np.transpose(nl), np.transpose(sl), val.ampl, val.phase):
f.write("{}{}{}\n".format(key, t1, t2))
for a, b, c in zip(val.constituents, amp.flatten(), phase.flatten()):
f.write("{0:<3s} {1:<.7e} {2:<.7e}\n".format(a, b, c))
def tidebc(self, **kwargs):
self.tide = tide()
for key, val in self.bound.__dict__.items():
# compute tide constituents
tval = []
if len(val) > 0.0:
blons = []
blats = []
for l1, l2 in val:
blons.append(self.mesh.Dataset.lons[l1[1] - 1, l1[0] - 1])
blats.append(self.mesh.Dataset.lats[l1[1] - 1, l1[0] - 1])
blons.append(self.mesh.Dataset.lons[l2[1] - 1, l2[0] - 1])
blats.append(self.mesh.Dataset.lats[l2[1] - 1, l2[0] - 1])
blons = np.array(blons) # .ravel().reshape(-1,2)[:,0]
blats = np.array(blats) # .ravel().reshape(-1,2)[:,1]
# print(bound,blons,blats)
tval = tide(tmodel=self.tmodel, tpath=self.tpath, blons=blons, blats=blats)
setattr(self.tide, key, tval)
@staticmethod
def to_obs(self, **kwargs):
# save obs
ofilename = get_value(self, kwargs, "ofilename", None)
flag = get_value(self, kwargs, "update", [])
if ofilename:
obs_points = pd.read_csv(
ofilename,
delimiter="\t",
header=None,
names=["index", "Name", "lat", "lon"],
)
obs_points = obs_points.set_index("index", drop=True).reset_index(drop=True) # reset index if any
obs_points = obs_points[
(
obs_points.lon.between(
self.mesh.Dataset.lons.values.min(),
self.mesh.Dataset.lons.values.max(),
)
)
& (
obs_points.lat.between(
self.mesh.Dataset.lats.values.min(),
self.mesh.Dataset.lats.values.max(),
)
)
]
obs_points.reset_index(inplace=True, drop=True)
try:
bat = -self.dem.Dataset.fval.values.astype(float) # reverse for the hydro run/use the adjusted values
# mask = bat==999999
except AttributeError:
bat = -self.dem.Dataset.ival.values.astype(
float
) # reverse for the hydro run/revert to interpolated values
b = np.ma.masked_array(bat, np.isnan(bat)) # mask land
i_indx, j_indx = self.vpoints(self.mesh.Dataset, obs_points, b, **kwargs)
obs_points["i"] = i_indx
obs_points["j"] = j_indx
# drop NaN points
obs = obs_points.dropna().copy()
obs = obs.reset_index(drop=True) # reset index
obs["i"] = obs["i"].values.astype(int)
obs["j"] = obs["j"].values.astype(int)
obs["new_lat"] = self.mesh.Dataset.y[obs.i.values].values # Valid point
obs["new_lon"] = self.mesh.Dataset.x[obs.j.values].values
self.obs = obs # store it
obs.Name = obs.Name.str.strip().apply(lambda name: name.replace(" ", "")) # Remove spaces to write to file
sort = sorted(obs.Name.values, key=len) # sort the names to get the biggest word
try:
wsize = len(sort[-1]) # size of bigget word in order to align below
except:
pass
if flag:
if ("all" in flag) | ("model" in flag):
# Add one in the indices due to python/fortran convention
try:
with open(self.rpath + "{}.obs".format(self.tag), "w") as f:
for l in range(obs.shape[0]):
f.write(
"{0:<{3}}{1:>{3}}{2:>{3}}\n".format(
obs.Name[l][:20], obs.j[l] + 1, obs.i[l] + 1, wsize
)
)
except: # TODO
pass
else:
try:
# Add one in the indices due to python/fortran convention
with open(self.rpath + "{}.obs".format(self.tag), "w") as f:
for l in range(obs.shape[0]):
f.write(
"{0:<{3}}{1:>{3}}{2:>{3}}\n".format(obs.Name[l][:20], obs.j[l] + 1, obs.i[l] + 1, wsize)
)
except:
pass
# ============================================================================================
# EXECUTION
# ============================================================================================
def create(self, **kwargs):
if not kwargs:
kwargs = self.__dict__.copy()
# Grid
self.mesh = pmesh.set(type="r2d", **kwargs)
# set lat/lon from file
if hasattr(self, "mesh_file"):
kwargs.update({"lon_min": self.mesh.Dataset.x.values.min()})
kwargs.update({"lon_max": self.mesh.Dataset.x.values.max()})
kwargs.update({"lat_min": self.mesh.Dataset.y.values.min()})
kwargs.update({"lat_max": self.mesh.Dataset.y.values.max()})
nj, ni = self.mesh.Dataset.lons.shape
self.nj, self.ni = nj, ni
kwargs.update({"ni": ni, "nj": nj})
# get bathymetry
self.bath(**kwargs)
# get boundaries
self.bc()
# get meteo
if self.atm:
self.force(**kwargs)
# get tide
if self.tide:
self.tidebc()
self.config(**kwargs)
def run(self, **kwargs):
calc_dir = get_value(self, kwargs, "rpath", "./d3d/")
try:
bin_path = os.environ["D3D"]
except:
bin_path = get_value(self, kwargs, "epath", None)
try:
lib_path = os.environ["LD3D"]
except:
lib_path = get_value(self, kwargs, "lpath", None)
if bin_path is None:
# ------------------------------------------------------------------------------
logger.warning("D3D executable path (epath) not given -> using default \n")
# ------------------------------------------------------------------------------
ncores = get_value(self, kwargs, "ncores", 0)
argfile = get_value(self, kwargs, "argfile", self.tag + "_hydro.xml")
tools.create_d3d_mpirun_script(
target_dir=calc_dir,
# cmd=bin_path,
script_name="run_flow2d3d.sh",
)
# ---------------------------------------------------------------------
logger.info("executing model\n")
# ---------------------------------------------------------------------
# note that cwd is the folder where the executable is
ex = subprocess.Popen(
args=["./run_flow2d3d.sh {} {} {}".format(argfile, bin_path, lib_path)],
cwd=calc_dir,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) # , bufsize=1)
with open(calc_dir + self.tag + "_run.log", "w") as f: # save output
for line in iter(ex.stdout.readline, b""):
f.write(line.decode(sys.stdout.encoding))
# logger.info(line.decode(sys.stdout.encoding))
for line in iter(ex.stderr.readline, b""):
logger.info(line.decode(sys.stdout.encoding))
tempfiles = glob.glob(calc_dir + "/tri-diag." + self.tag + "-*")
try:
biggest = max(tempfiles, key=(lambda tf: os.path.getsize(tf)))
with open(biggest, "r") as f1:
for line in f1:
f.write(line.decode(sys.stdout.encoding))
except:
pass
# cleanup
tempfiles = glob.glob(calc_dir + "/tri-diag." + self.tag + "-*")
biggest = max(tempfiles, key=(lambda tf: os.path.getsize(tf)))
with open(calc_dir + self.tag + "_run.log", "a") as f: # save diagnosis
with open(biggest, "r") as f1:
for line in f1:
f.write(line)
tempfiles = glob.glob(calc_dir + "/tri-diag." + self.tag + "-*") + glob.glob(calc_dir + "/TMP_*")
for filename in tempfiles:
try:
os.remove(filename)
except OSError:
pass
ex.stdout.close()
ex.stderr.close()
# ---------------------------------------------------------------------
logger.info("FINISHED\n")
# ---------------------------------------------------------------------
def save(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./d3d/")
lista = [key for key, value in self.__dict__.items() if key not in ["meteo", "dem", "mesh"]]
dic = {k: self.__dict__.get(k, None) for k in lista}
mesh = self.__dict__.get("mesh", None)
if isinstance(mesh, str):
dic.update({"mesh": mesh})
else:
dic.update({"mesh": mesh.__class__.__name__})
dem = self.__dict__.get("dem", None)
if isinstance(dem, str):
dic.update({"dem": dem})
elif isinstance(dem, pdem.Dem):
dic.update({"dem": dem.Dataset.elevation.attrs})
meteo = self.__dict__.get("meteo", None)
if isinstance(meteo, str):
dic.update({"meteo": meteo})
elif isinstance(meteo, pmeteo.Meteo):
dic.update({"meteo": meteo.Dataset.attrs})
dic["version"] = pyposeidon.__version__
for attr, value in dic.items():
if isinstance(value, datetime.datetime):
dic[attr] = dic[attr].isoformat()
if isinstance(value, pd.Timedelta):
dic[attr] = dic[attr].isoformat()
if isinstance(value, pd.DataFrame):
dic[attr] = dic[attr].to_dict()
json.dump(dic, open(path + self.tag + "_model.json", "w"), default=myconverter)
def output(self, **kwargs):
path = get_value(self, kwargs, "rpath", "./d3d/")
slevel = get_value(self, kwargs, "slevel", 0.0)
flag = get_value(self, kwargs, "update", [])
nj, ni = self.mesh.Dataset.lons.shape
if not os.path.exists(path):
os.makedirs(path)
# save mdf
self.mdf.to_csv(path + self.tag + ".mdf", sep="=")
# save mesh file
if flag:
if ("all" in flag) | ("mesh" in flag):
# save mesh
self.mesh.to_file(filename=path + self.tag + ".grd")
else:
logger.info("skipping mesh file ..\n")
else:
self.mesh.to_file(filename=path + self.tag + ".grd")
# save bathymetry file
self.to_dep(self.dem.Dataset, rpath=path, tag=self.tag, update=flag)
# save meteo
if self.atm:
try:
self.to_force(self.meteo.Dataset, vars=["msl", "u10", "v10"], rpath=path, **kwargs)
except AttributeError as e:
logger.warning("no meteo data available.. no update..\n")
pass
# save obs file
self.to_obs(self, **kwargs)
# save enc file
if flag:
if ("all" in flag) | ("model" in flag):
# save enc
# write enc out
with open(path + self.tag + ".enc", "w") as f:
f.write("{:>5}{:>5}\n".format(ni + 1, 1)) # add one like ddb
f.write("{:>5}{:>5}\n".format(ni + 1, nj + 1))
f.write("{:>5}{:>5}\n".format(1, nj + 1))
f.write("{:>5}{:>5}\n".format(1, 1))
f.write("{:>5}{:>5}\n".format(ni + 1, 1))
else:
# write enc out
with open(path + self.tag + ".enc", "w") as f:
f.write("{:>5}{:>5}\n".format(ni + 1, 1)) # add one like ddb
f.write("{:>5}{:>5}\n".format(ni + 1, nj + 1))
f.write("{:>5}{:>5}\n".format(1, nj + 1))
f.write("{:>5}{:>5}\n".format(1, 1))
f.write("{:>5}{:>5}\n".format(ni + 1, 1))
calc_dir = get_value(self, kwargs, "rpath", "./d3d/")
try:
bin_path = os.environ["D3D"]
except:
bin_path = get_value(self, kwargs, "epath", None)
try:
lib_path = os.environ["LD3D"]
except:
lib_path = get_value(self, kwargs, "lpath", None)
if bin_path is None:
# ---------------------------------------------------------------------
logger.warning("D3D executable path (epath) not given\n")
# ---------------------------------------------------------------------
if lib_path is None:
# ---------------------------------------------------------------------
logger.warning("D3D libraries path (lpath) not given\n")
# ---------------------------------------------------------------------
ncores = get_value(self, kwargs, "ncores", NCORES)
if not os.path.exists(calc_dir + self.tag + "_hydro.xml"):
# edit and save config file
copy2(DATA_PATH + "config_d_hydro.xml", calc_dir + self.tag + "_hydro.xml")
xml = md.parse(calc_dir + self.tag + "_hydro.xml")
xml.getElementsByTagName("mdfFile")[0].firstChild.replaceWholeText(self.tag + ".mdf")
with open(calc_dir + self.tag + "_hydro.xml", "w") as f:
xml.writexml(f)
if not os.path.exists(calc_dir + "run_flow2d3d.sh"):
copy2(DATA_PATH + "run_flow2d3d.sh", calc_dir + "run_flow2d3d.sh")
# make the script executable
execf = calc_dir + "run_flow2d3d.sh"
mode = os.stat(execf).st_mode
mode |= (mode & 0o444) >> 2 # copy R bits to X
os.chmod(execf, mode)
# ---------------------------------------------------------------------
logger.info("output done\n")
# ---------------------------------------------------------------------
@staticmethod
def vpoints(grid, obs_points, bat, **kwargs):
idx = []
jdx = []
for m in range(obs_points.shape[0]):
lat, lon = obs_points.loc[m, ["lat", "lon"]]
nearest = grid.sel(x=[lon], y=[lat], method="nearest")
j = np.abs(grid.x.values - nearest.x.values).argmin()
i = np.abs(grid.y.values - nearest.y.values).argmin()
if bat[i, j]:
idx.append(i)
jdx.append(j)
else:
bnear = bat[i - 5 : i + 6, j - 5 : j + 6] # near by grid nodes
rlon = grid.lons[i - 5 : i + 6, j - 5 : j + 6] - lon
rlat = grid.lats[i - 5 : i + 6, j - 5 : j + 6] - lat
rad = np.sqrt(rlon ** 2 + rlat ** 2) # radial distance from the obs point
rmask = rad.values[bnear.mask == False] # mask the distance array with the valid mask from dem
rmask.sort() # sort to start close and move further away
if rmask.size > 0:
for r in rmask: # Find the closest valid point
[[k, l]] = np.argwhere(rad.values == r)
if bnear[k - 1 : k + 1, l - 1 : l + 1].mask.sum() == 0:
break # The point is valid point
xv = rad[k, l].x.values # lat, lon of valid point
yv = rad[k, l].y.values
# final i,j
j = np.abs(grid.x.values - xv).argmin()
i = np.abs(grid.y.values - yv).argmin()
idx.append(i)
jdx.append(j)
else:
idx.append(np.nan)
jdx.append(np.nan)
return idx, jdx
def execute(self, **kwargs):
self.create(**kwargs)
self.output(**kwargs)
self.save(**kwargs)
self.run(**kwargs)
def read_folder(self, rfolder, **kwargs):
gfile = glob.glob(rfolder + "/*.grd") # Grid
dfile = glob.glob(rfolder + "/*.dep") # bathymetry
u = glob.glob(rfolder + "/*.amu") # meteo
v = glob.glob(rfolder + "/*.amv")
p = glob.glob(rfolder + "/*.amp")
# config
self.mdf = pd.read_csv(d[0], sep="=")
self.mdf = self.mdf.set_index(self.mdf.columns[0]) # set index
# mesh
self.mesh = pmesh.set("r2d", mesh_file=gfile[0])
# bath
self.dem.Dataset = d3d.from_dep(dfile[0])
# meteo
mf = []
mf.append(d3d.from_force(u[0], "u10"))
mf.append(d3d.from_force(v[0], "v10"))
mf.append(d3d.from_force(p[0], "msl"))
self.meteo.Dataset = xr.merge(mf)
# ---------------------------------------------------------------------
logger.exception("folder incomplete. Abort\n")
sys.exit(1)
# ---------------------------------------------------------------------
def get_output_data(self, **kwargs):
dic = self.__dict__
dic.update(kwargs)
self.data = data.get_output(**dic)
__init__(self, **kwargs)
special
¶
Create a D3D solver
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
Union[dict, str, GeoDataFrame] |
A |
required |
start_date |
str |
The date from which the analysis should start. It should be a string parseable
by |
required |
end_date |
str |
The date at which the analysis should end. It should be a string parseable by
|
required |
time_frame |
str |
The duration of the analysis. It should be a string parseable by
|
required |
date |
str |
Reference date of the run. |
required |
meteo_source |
str |
Path or url to meteo data. |
required |
dem_source |
str |
Path or url to bathymetric data. |
required |
argfile |
str |
Path to |
required |
update |
str |
Control the update of the model e.g |
required |
rpath |
str |
Path for output of the model. Defaults to |
required |
tide |
str |
Flag indicating whether to load "tide". Defaults to |
required |
atm |
bool |
The solver's atm. Defaults to |
required |
tag |
str |
The model's "tag". Defaults to |
required |
resolution |
float |
size of the regular grid. Defaults to |
required |
ofilename |
str |
Path to station file. Defaults to |
required |
epath |
str |
The path to the schism executable. If the |
required |
config_file |
str |
Path to mdf file. Defaults to |
required |
config |
dict |
Parameter options passed to mdf file. |
required |
output |
bool |
Flag for saving to file. Defaults to |
required |
update |
list[str] |
Control the update of the model e.g |
required |
Source code in pyposeidon/d3d.py
def __init__(self, **kwargs):
"""
Create a D3D solver
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
geometry Union[dict, str, GeoDataFrame]: A `GeoDataFrame` or the path to a shapefile or
a dict defining the lat/lon window.
start_date str: The date from which the analysis should start. It should be a string parseable
by `pd.to_datetime()`.
end_date str: The date at which the analysis should end. It should be a string parseable by
`pd.to_datetime()`.
time_frame str: The duration of the analysis. It should be a string parseable by
`pd.to_datetime()`.
date str: Reference date of the run.
meteo_source str: Path or url to meteo data.
dem_source str: Path or url to bathymetric data.
argfile str: Path to `_hydro.xml` file.
update str: Control the update of the model e.g `['dem']`-> updates only bathymetry.
Defaults to `["all"]`.
rpath str: Path for output of the model. Defaults to `./d3d/`.
tide str: Flag indicating whether to load "tide". Defaults to `False`.
atm bool: The solver's atm. Defaults to `True`.
tag str: The model's "tag". Defaults to `"d3d"`.
resolution float: size of the regular grid. Defaults to `0.1`.
ofilename str: Path to station file. Defaults to `None`.
epath str: The path to the schism executable. If the `D3D` env variable has been
set, then it overrides the value passed as the parameter.
config_file str: Path to mdf file. Defaults to `None`.
config dict: Parameter options passed to mdf file.
output bool: Flag for saving to file. Defaults to `False`.
update list[str]: Control the update of the model e.g `['dem']`-> updates only bathymetry.
Defaults to `["all"]`.
"""
self.geometry = kwargs.get("geometry", None)
if self.geometry:
if isinstance(self.geometry, dict):
self.lon_min = self.geometry["lon_min"]
self.lon_max = self.geometry["lon_max"]
self.lat_min = self.geometry["lat_min"]
self.lat_max = self.geometry["lat_max"]
elif isinstance(self.geometry, str):
try:
geo = gp.GeoDataFrame.from_file(self.geometry)
except:
logger.error("geometry argument not a valid geopandas file")
sys.exit(1)
(
self.lon_min,
self.lat_min,
self.lon_max,
self.lat_max,
) = geo.total_bounds
start_date = kwargs.get("start_date", None)
self.start_date = pd.to_datetime(start_date)
if "time_frame" in kwargs:
time_frame = kwargs.get("time_frame", None)
self.end_date = self.start_date + pd.to_timedelta(time_frame)
self.time_frame = time_frame
elif "end_date" in kwargs:
end_date = kwargs.get("end_date", None)
self.end_date = pd.to_datetime(end_date)
self.time_frame = self.end_date - self.start_date
if not hasattr(self, "date"):
self.date = self.start_date
if not hasattr(self, "end_date"):
# ---------------------------------------------------------------------
logger.warning("model not set properly, No end_date\n")
# ---------------------------------------------------------------------
self.tag = kwargs.get("tag", "d3d")
self.resolution = kwargs.get("resolution", 0.1)
self.irange = kwargs.get("irange", [0, -1, 1])
self.tide = kwargs.get("tide", False)
self.atm = kwargs.get("atm", True)
self.ofilename = kwargs.get("ofilename", None)
self.solver_name = D3D_NAME
try:
self.epath = os.environ["D3D"]
except:
self.epath = kwargs.get("epath", None)
for attr, value in kwargs.items():
if not hasattr(self, attr):
setattr(self, attr, value)
pyposeidon.mjigsaw
¶
jigsaw module
get(contours, **kwargs)
¶
Create a jigsaw
mesh.
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
contours |
GeoDataFrame |
Provide boundaries and metadata. |
required |
tag |
str |
Identifier. Defaults to |
required |
rpath |
str |
Path for output. Defaults to |
required |
dem_source |
str |
Path or url to bathymetric data. |
required |
bgmesh |
str |
Path to a mesh scale file. Defaults to |
required |
setup_only |
bool |
Flag for setup only (no execution). Defaults to |
required |
hfun_scal |
str |
Mesh scale option either "RELATIVE" or "ABSOLUTE". Defaults to "ABSOLUTE". |
required |
hfun_min |
float |
Minimum mesh size. Defaults to |
required |
hfun_max |
float |
Maximum mesh size. Defaults to |
required |
Source code in pyposeidon/mjigsaw.py
def get(contours, **kwargs):
"""
Create a `jigsaw` mesh.
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
contours GeoDataFrame: Provide boundaries and metadata.
tag str: Identifier. Defaults to `"jigsaw"`.
rpath str: Path for output. Defaults to `"."`.
dem_source str: Path or url to bathymetric data.
bgmesh str: Path to a mesh scale file. Defaults to `None`.
setup_only bool: Flag for setup only (no execution). Defaults to `False`.
hfun_scal str: Mesh scale option either "RELATIVE" or "ABSOLUTE". Defaults to "ABSOLUTE".
hfun_min float: Minimum mesh size. Defaults to `0.0`.
hfun_max float: Maximum mesh size. Defaults to `1000.`.
"""
logger.info("Creating JIGSAW files\n")
tag = kwargs.get("tag", "jigsaw")
rpath = kwargs.get("rpath", ".")
if not os.path.exists(rpath):
os.makedirs(rpath)
path = rpath + "/jigsaw/"
if not os.path.exists(path):
os.makedirs(path)
# HFUN FILE
bgmesh = kwargs.get("bgmesh", None)
if bgmesh is None:
dem_source = kwargs.get("dem_source", None)
if dem_source:
bgmesh = "auto"
kwargs.update({"bgmesh": "auto"})
if bgmesh is not None:
logger.info("Set background scale")
if bgmesh.endswith(".nc"):
try:
dh = xr.open_dataset(bgmesh)
if "longitude" in dh.coords:
to_hfun_grid(dh, path + tag + "-hfun.msh") # write bgmesh file
else:
to_hfun_mesh(dh, path + tag + "-hfun.msh")
except:
logger.warning("bgmesh failed... continuing without background mesh size")
bgmesh = None
elif bgmesh == "auto":
dh = make_bgmesh(contours, **kwargs)
elif bgmesh.endswith(".msh"):
pass
try:
bg = dh
except:
bg = None
# GEO FILE
to_geo(contours, path=path, tag=tag)
# JIG FILE
fjig = path + "/" + tag + ".jig"
hfun_scal = kwargs.get("hfun_scal", "ABSOLUTE")
hfun_min = kwargs.get("hfun_min", 0.0)
hfun_max = kwargs.get("hfun_max", 1000.0)
with open(fjig, "w") as f:
f.write("GEOM_FILE ={}\n".format(tag + "-geo.msh"))
f.write("MESH_FILE ={}\n".format(tag + ".msh"))
if bgmesh:
f.write("HFUN_FILE ={}\n".format(tag + "-hfun.msh"))
f.write("HFUN_SCAL = {}\n".format(hfun_scal))
f.write("HFUN_HMAX = {}\n".format(hfun_max))
f.write("HFUN_HMIN = {}\n".format(hfun_min))
f.write("MESH_DIMS = 2\n")
f.write("MESH_TOP1 = TRUE\n")
# f.write('MESH_TOP2 = TRUE\n')
f.write("MESH_EPS1 = 1.0\n")
f.write("MESH_RAD2 = 1\n")
f.write("GEOM_FEAT = TRUE\n")
f.write("VERBOSITY = 2")
calc_dir = rpath + "/jigsaw/"
# EXECUTE
setup_only = kwargs.get("setup_only", False)
if not setup_only:
# ---------------------------------
logger.info("executing jigsaw\n")
# ---------------------------------
# execute jigsaw
ex = subprocess.Popen(
args=["jigsaw {}".format(tag + ".jig")],
cwd=calc_dir,
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
) # , bufsize=1)
with open(calc_dir + "err.log", "w") as f:
for line in iter(ex.stderr.readline, b""):
f.write(line.decode(sys.stdout.encoding))
# logger.info(line.decode(sys.stdout.encoding))
ex.stderr.close()
with open(calc_dir + "run.log", "w") as f:
for line in iter(ex.stdout.readline, b""):
f.write(line.decode(sys.stdout.encoding))
# logger.info(line.decode(sys.stdout.encoding))
ex.stdout.close()
# ---------------------------------
logger.info("Jigsaw FINISHED\n")
# ---------------------------------
gr = read_msh(calc_dir + tag + ".msh", **kwargs)
logger.info("..done creating mesh\n")
return gr, bg
else:
gr = None
return gr, bg
pyposeidon.mgmsh
¶
gmsh module
get(contours, **kwargs)
¶
Create a gmsh
mesh.
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as required
, nevertheless they are all Optional
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
contours |
GeoDataFrame |
Provide boundaries and metadata. |
required |
rpath |
str |
Path for output. Defaults to |
required |
use_bindings |
bool |
Flag for using python API as opposed to binary. Defaults to |
required |
dem_source |
str |
Path or url to bathymetric data. |
required |
bgmesh |
str |
Path to a mesh scale file. Defaults to |
required |
setup_only |
bool |
Flag for setup only (no execution). Defaults to |
required |
Source code in pyposeidon/mgmsh.py
def get(contours, **kwargs):
"""
Create a `gmsh` mesh.
!!! danger ""
Due to a limitation of the Library rendering the docstrings, all arguments are marked
as `required`, nevertheless they are all `Optional`.
Args:
contours GeoDataFrame: Provide boundaries and metadata.
rpath str: Path for output. Defaults to `"."`.
use_bindings bool: Flag for using python API as opposed to binary. Defaults to `True`.
dem_source str: Path or url to bathymetric data.
bgmesh str: Path to a mesh scale file. Defaults to `None`.
setup_only bool: Flag for setup only (no execution). Defaults to `False`.
"""
logger.info("Creating grid with GMSH\n")
rpath = kwargs.get("rpath", ".")
if not os.path.exists(rpath):
os.makedirs(rpath)
gpath = os.path.join(rpath, "gmsh")
if not os.path.exists(gpath):
os.makedirs(gpath)
use_bindings = kwargs.get("use_bindings", True)
setup_only = kwargs.get("setup_only", False)
bgmesh = kwargs.get("bgmesh", None)
if bgmesh is None:
dem_source = kwargs.get("dem_source", None)
if dem_source:
bgmesh = "auto"
kwargs.update({"bgmesh": "auto"})
if bgmesh == "auto":
try:
rpath = kwargs.get("rpath", ".")
if not os.path.exists(rpath + "/gmsh/"): # check if run folder exists
os.makedirs(rpath + "/gmsh/")
fpos = rpath + "/gmsh/bgmesh.pos"
gglobal = kwargs.get("gglobal", False)
if gglobal:
dem = pdem.Dem(**kwargs)
nds, lms = make_bgmesh_global(contours, fpos, dem.Dataset, **kwargs)
dh = to_global_pos(nds, lms, fpos, **kwargs)
else:
dh = make_bgmesh(contours, fpos, **kwargs)
kwargs.update({"bgmesh": fpos})
except OSError as e:
logger.warning("bgmesh failed... continuing without background mesh size")
dh = None
kwargs.update({"bgmesh": None})
if use_bindings:
logger.info("using python bindings")
make_gmsh(contours, **kwargs)
else:
to_geo(contours, **kwargs)
if not setup_only:
logger.info("using GMSH binary")
gmsh_execute(**kwargs)
if not setup_only:
gr = read_msh(rpath + "/gmsh/mymesh.msh", **kwargs)
try:
bg = dh
except:
bg = None
return gr, bg