Source code for pyTMD.io.ATLAS

#!/usr/bin/env python
"""
ATLAS.py
Written by Tyler Sutterley (04/2026)

Reads netCDF4 ATLAS tidal solutions provided by Oregon State University

PYTHON DEPENDENCIES:
    xarray: N-D labeled arrays and datasets in Python
        https://docs.xarray.dev/en/stable/

UPDATE HISTORY:
    Updated 04/2026: added lineage attributes to save model filename(s)
    Updated 02/2026: make dataset and datatree accessors for ATLAS
        be subaccessors from dataset module
    Updated 12/2025: no longer subclassing pathlib.Path for working directories
        added option to change the output datatype when writing netCDF files
    Updated 11/2025: near-complete rewrite of program to use xarray
    Updated 08/2025: use numpy degree to radian conversions
        added option to gap fill when reading constituent grids
    Updated 11/2024: expose buffer distance for cropping tide model data
    Updated 10/2024: fix error when using default bounds in extract_constants
    Updated 07/2024: added crop and bounds keywords for trimming model data
    Updated 02/2024: changed variable for setting global grid flag to is_global
    Updated 10/2023: add generic wrapper function for reading constituents
    Updated 04/2023: using pathlib to define and expand tide model paths
    Updated 03/2023: add basic variable typing to function inputs
    Updated 12/2022: refactor tide read programs under io
        new functions to read and interpolate from constituents class
        new functions to output ATLAS formatted netCDF4 files
        refactored interpolation routines into new module
    Updated 11/2022: place some imports within try/except statements
        use f-strings for formatting verbose or ascii output
    Updated 07/2022: fix setting of masked array data to NaN
    Updated 05/2022: reformat arguments to extract_netcdf_constants definition
        changed keyword arguments to camel case
    Updated 04/2022: updated docstrings to numpy documentation format
    Updated 12/2021: adjust longitude convention based on model longitude
    Updated 09/2021: fix cases where there is no mask on constituent files
    Updated 07/2021: added check that tide model files are accessible
    Updated 06/2021: add warning for tide models being entered as string
    Updated 05/2021: added option for extrapolation cutoff in kilometers
    Updated 03/2021: add extrapolation check where there are no invalid points
        prevent ComplexWarning for fill values when calculating amplitudes
        simplified inputs to be similar to binary OTIS read program
    Updated 02/2021: set invalid values to nan in extrapolation
        replaced numpy bool to prevent deprecation warning
    Updated 12/2020: added valid data extrapolation with nearest_extrap
        replace tostring with tobytes to fix DeprecationWarning
    Updated 11/2020: create function to read bathymetry and spatial coordinates
    Updated 09/2020: set bounds error to false for regular grid interpolations
        adjust dimensions of input coordinates to be iterable
        reduce number of interpolations by copying bathymetry mask to variables
    Updated 08/2020: replaced griddata with scipy regular grid interpolators
    Updated 07/2020: added function docstrings. separate bilinear interpolation
        changed TYPE variable to keyword argument. update griddata interpolation
    Updated 06/2020: use argmin and argmax in bilinear interpolation
    Written 09/2019
"""

from __future__ import annotations

import gzip
import pathlib
import datetime
import xarray as xr
import pyTMD.version
import pyTMD.utilities
from .dataset import (
    combine_attrs,
    register_dataset_subaccessor,
    register_datatree_subaccessor,
)

# attempt imports
dask = pyTMD.utilities.import_dependency("dask")
dask_available = pyTMD.utilities.dependency_available("dask")

__all__ = [
    "open_dataset",
    "open_mfdataset",
    "open_atlas_grid",
    "open_atlas_dataset",
    "ATLASDataset",
    "ATLASDataTree",
]


[docs] def open_dataset( model_files: list[str] | list[pathlib.Path], grid_file: str | pathlib.Path, **kwargs, ): """ Open ATLAS tide model file Parameters ---------- model_files: list of str or pathlib.Path List of ATLAS model files grid_file: str or pathlib.path ATLAS model grid file kwargs: dict Additional keyword arguments for opening ATLAS files Returns ------- ds: xarray.Dataset ATLAS tide model data """ # set default keyword arguments kwargs.setdefault("group", "z") # read ATLAS grid and model files ds1 = open_atlas_grid(grid_file, **kwargs) ds2 = open_mfdataset(model_files, **kwargs) # convert transports to currents if necessary if kwargs["group"] in ("u", "v"): # convert transports to currents and update attributes for c in ds2.tmd.constituents: ds2[c] /= ds1["bathymetry"] units = str(ds2[c].tmd.units / ds1["bathymetry"].tmd.units) ds2[c].attrs["units"] = units # merge datasets ds = xr.merge([ds1, ds2], combine_attrs=combine_attrs, compat="override") # return xarray dataset return ds
# PURPOSE: read a list of ATLAS netCDF4 files
[docs] def open_mfdataset( model_files: list[str] | list[pathlib.Path], parallel: bool = False, **kwargs, ): """ Open multiple ATLAS model files Parameters ---------- model_files: list of str or pathlib.Path List of ATLAS model files parallel: bool, default False Open files in parallel using ``dask.delayed`` kwargs: dict Additional keyword arguments for opening ATLAS files Returns ------- ds: xarray.Dataset ATLAS tide model data """ # merge multiple granules if parallel and dask_available: opener = dask.delayed(open_atlas_dataset) else: opener = open_atlas_dataset # read each file as xarray dataset and append to list d = [opener(f, **kwargs) for f in model_files] # read datasets as dask arrays if parallel and dask_available: (d,) = dask.compute(d) # merge datasets ds = xr.merge(d, combine_attrs=combine_attrs, compat="override") # return xarray dataset return ds
[docs] def open_atlas_grid( input_file: str | pathlib.Path, group: str = "z", **kwargs, ): """ Open ATLAS model grid file Parameters ---------- input_file: str or pathlib.Path ATLAS model grid file group: str, default 'z' Tidal variable to read - ``'z'``: heights - ``'u'``: zonal currents - ``'U'``: zonal depth-averaged transport - ``'v'``: meridional currents - ``'V'``: meridional depth-averaged transport compressed: bool, default False Input file is ``gzip`` compressed Returns ------- ds: xarray.Dataset ATLAS tide model data """ # detect if file is compressed if not provided if kwargs.get("compressed", None) is None: kwargs["compressed"] = pyTMD.utilities.detect_compression(input_file) # tilde-expand input file input_file = pyTMD.utilities.Path(input_file).resolve() if isinstance(input_file, pathlib.Path) and not input_file.exists(): raise FileNotFoundError(f"File not found: {input_file}") # read the netCDF4-format file if kwargs["compressed"]: # read gzipped netCDF4 file f = gzip.open(input_file, "rb") tmp = xr.open_dataset(f, mask_and_scale=True) else: tmp = xr.open_dataset(input_file, mask_and_scale=True) # read bathymetry and coordinates for variable group if group == "z": # get bathymetry at nodes ds = tmp["hz"].T.to_dataset(name="bathymetry") ds.coords["x"] = tmp["lon_z"] ds.coords["y"] = tmp["lat_z"] elif group in ("U", "u"): # get bathymetry at nodes ds = tmp["hu"].T.to_dataset(name="bathymetry") ds.coords["x"] = tmp["lon_u"] ds.coords["y"] = tmp["lat_u"] elif group in ("V", "v"): # get bathymetry at nodes ds = tmp["hv"].T.to_dataset(name="bathymetry") ds.coords["x"] = tmp["lon_v"] ds.coords["y"] = tmp["lat_v"] # mask invalid bathymetries ds = ds.where(ds.bathymetry != 0, None, drop=False) # swap dimension names ds = ds.swap_dims(dict(nx="x", ny="y")) # add attributes ds.attrs["group"] = group ds.attrs["format"] = "ATLAS" ds.attrs["lineage"] = pathlib.Path(input_file).name # return xarray dataset return ds
# PURPOSE: reads ATLAS netCDF4 files
[docs] def open_atlas_dataset( input_file: str | pathlib.Path, group: str = "z", chunks: int | dict | str | None = None, **kwargs, ): """ Open ATLAS-formatted netCDF4 files Parameters ---------- input_file: str or pathlib.Path Input ATLAS file group: str, default 'z' Tidal variable to read - ``'z'``: heights - ``'u'``: zonal currents - ``'U'``: zonal depth-averaged transport - ``'v'``: meridional currents - ``'V'``: meridional depth-averaged transport chunks: int, dict, str, or None, default None Variable chunk sizes for dask (see ``xarray.open_dataset``) compressed: bool, default False Input file is ``gzip`` compressed Returns ------- ds: xarray.Dataset ATLAS tide model data """ # detect if file is compressed if not provided if kwargs.get("compressed", None) is None: kwargs["compressed"] = pyTMD.utilities.detect_compression(input_file) # tilde-expand input file input_file = pyTMD.utilities.Path(input_file).resolve() if isinstance(input_file, pathlib.Path) and not input_file.exists(): raise FileNotFoundError(f"File not found: {input_file}") # read the netCDF4-format file if kwargs["compressed"]: # read gzipped netCDF4 file f = gzip.open(input_file, "rb") tmp = xr.open_dataset(f, mask_and_scale=True, chunks=chunks) else: tmp = xr.open_dataset(input_file, mask_and_scale=True, chunks=chunks) # constituent name con = tmp["con"].values.astype("|S").tobytes().decode("utf-8").strip() if group == "z": ds = (tmp["hRe"].T + 1j * tmp["hIm"].T).to_dataset(name=con) ds.coords["x"] = tmp["lon_z"] ds.coords["y"] = tmp["lat_z"] ds[con].attrs["units"] = tmp["hRe"].attrs.get("units") elif group in ("U", "u"): ds = (tmp["uRe"].T + 1j * tmp["uIm"].T).to_dataset(name=con) ds.coords["x"] = tmp["lon_u"] ds.coords["y"] = tmp["lat_u"] ds[con].attrs["units"] = tmp["uRe"].attrs.get("units") elif group in ("V", "v"): ds = (tmp["vRe"].T + 1j * tmp["vIm"].T).to_dataset(name=con) ds.coords["x"] = tmp["lon_v"] ds.coords["y"] = tmp["lat_v"] ds[con].attrs["units"] = tmp["vRe"].attrs.get("units") # swap dimension names ds = ds.swap_dims(dict(nx="x", ny="y")) # add attributes ds.attrs["format"] = "ATLAS" ds.attrs["group"] = group.upper() if group in ("u", "v") else group ds.attrs["lineage"] = pathlib.Path(input_file).name # return xarray dataset return ds
# PURPOSE: ATLAS-netcdf utilities for xarray Datasets
[docs] @register_dataset_subaccessor("atlas") class ATLASDataset: """ ``xarray.Dataset`` utilities for ATLAS-netcdf tidal models """ def __init__(self, ds): self._ds = ds # PURPOSE: output grid file in ATLAS netCDF format
[docs] def to_grid( self, path: str | pathlib.Path, mode: str = "w", encoding: dict = {"zlib": True, "complevel": 9}, astype: str = "float32", **kwargs, ): """ Writes grid data to netCDF4 files in ATLAS format Parameters ---------- path: str or pathlib.Path Output ATLAS-netcdf grid file name mode: str, default 'w' netCDF4 file mode encoding: dict, default {"zlib": True, "complevel": 9} netCDF4 variable compression settings kwargs: dict Additional keyword arguments for ``xarray`` netCDF4 writer """ # tilde-expand output file path = pyTMD.utilities.Path(path).resolve() # set variable names for group group = self._ds.attrs["group"].lower() depth_key = f"h{group}" lon_key = f"lon_{group}" lat_key = f"lat_{group}" # set default encoding kwargs.setdefault("encoding", {depth_key: encoding}) # coordinate remapping mapping_coords = dict(x=lon_key, y=lat_key) attrs = {lon_key: {}, lat_key: {}, depth_key: {}} # set variable attributes attrs[lon_key]["units"] = "degrees_east" attrs[lon_key]["long_name"] = f"longitude of {group.upper()} nodes" attrs[lat_key]["units"] = "degrees_north" attrs[lat_key]["long_name"] = f"latitude of {group.upper()} nodes" units = self._ds["bathymetry"].attrs.get("units", "meters") attrs[depth_key]["units"] = units attrs[depth_key]["long_name"] = f"Bathymetry at {group.upper()} nodes" attrs[depth_key]["field"] = "bath, scalar" # create output xarray dataset ds = xr.Dataset() ds[depth_key] = self._ds["bathymetry"].astype(astype) # rename dimensions ds = ds.swap_dims(dict(x="nx", y="ny")) # remap coordinates to ATLAS convention ds = ds.rename(mapping_coords) # add global attributes ds.attrs["title"] = "ATLAS bathymetry data" ds.attrs["group"] = "OTIS grid file" ds.attrs["date_created"] = datetime.datetime.now().isoformat() ds.attrs["software_reference"] = pyTMD.version.project_name ds.attrs["software_version"] = pyTMD.version.full_version # set variable attributes for key, value in attrs.items(): ds[key].attrs.update(value) # output to netCDF4 file ds.to_netcdf(path, mode=mode, **kwargs)
# PURPOSE: output tidal constituent data in ATLAS netCDF format
[docs] def to_netcdf( self, path: str | pathlib.Path, mode: str = "w", encoding: dict = {"zlib": True, "complevel": 9}, astype: str = "float32", **kwargs, ): """ Writes tidal constituents to netCDF4 files in ATLAS format Parameters ---------- path: str or pathlib.Path Output directory for ATLAS-netcdf files mode: str, default 'w' netCDF4 file mode encoding: dict, default {"zlib": True, "complevel": 9} netCDF4 variable compression settings kwargs: dict Additional keyword arguments for ``xarray`` netCDF4 writer """ # tilde-expand output directory path = pyTMD.utilities.Path(path).resolve() # set variable names group = self._ds.attrs["group"].lower() type_key = dict(z="h", u="U", v="V")[group] lon_key = f"lon_{group}" lat_key = f"lat_{group}" # set default encoding default_encoding = {f"{type_key}{c}": encoding for c in ("Re", "Im")} kwargs.setdefault("encoding", default_encoding) # coordinate remapping mapping_coords = dict(x=lon_key, y=lat_key) attrs = {lon_key: {}, lat_key: {}} # set variable attributes attrs[lon_key]["units"] = "degrees_east" attrs[lon_key]["long_name"] = f"longitude of {group.upper()} nodes" attrs[lat_key]["units"] = "degrees_north" attrs[lat_key]["long_name"] = f"latitude of {group.upper()} nodes" # build variable attributes for real and imaginary components for key, val in dict(Re="Real part", Im="Imag part").items(): # variable units and long_name attributes if group == "z": long_name = f"Tidal elevation complex amplitude, {val}" elif group == "u": long_name = f"Tidal WE transport complex amplitude, {val}" elif group == "v": long_name = f"Tidal SN transport complex amplitude, {val}" # variable field description fields = [] fields.append(f"{key}({type_key}), scalar") fields.append(f"amp=abs({type_key}Re+i*{type_key}Im)") fields.append(f"GMT phase=atan2(-{type_key}Im,{type_key}Re)/pi*180") # set variable attributes attrs[f"{type_key}{key}"] = {} attrs[f"{type_key}{key}"]["long_name"] = long_name attrs[f"{type_key}{key}"]["field"] = "; ".join(fields) # create output xarray dataset for each constituent for v in self._ds.tmd.constituents: # create xarray dataset ds = xr.Dataset() # extract real and imaginary components ds[f"{type_key}Re"] = self._ds[v].real.astype(astype) ds[f"{type_key}Im"] = self._ds[v].imag.astype(astype) # define and fill constituent ID ds["con"] = v.ljust(4).encode("utf8") # rename dimensions ds = ds.swap_dims(dict(x="nx", y="ny")) # remap coordinates to ATLAS convention ds = ds.rename(mapping_coords) # update variable attributes for att_name, att_val in attrs.items(): ds[att_name].attrs.update(att_val) ds[att_name].attrs["units"] = self._ds[v].attrs["units"] ds["con"].attrs["_Encoding"] = "utf8" ds["con"].attrs["long_name"] = "tidal constituent" # add global attributes if group == "z": ds.attrs["title"] = "ATLAS tidal elevation file" ds.attrs["group"] = "OTIS elevation file" elif group in ("u", "v"): ds.attrs["title"] = "ATLAS tidal SN and WE transports file" ds.attrs["group"] = "OTIS transport file" ds.attrs["date_created"] = datetime.datetime.now().isoformat() ds.attrs["software_reference"] = pyTMD.version.project_name ds.attrs["software_version"] = pyTMD.version.full_version # write ATLAS netCDF4 file FILE = path.joinpath(f"{v}.nc") ds.to_netcdf(FILE, mode=mode, **kwargs)
# PURPOSE: ATLAS-netcdf utilities for xarray DataTrees
[docs] @register_datatree_subaccessor("atlas") class ATLASDataTree: """ ``xarray.DataTree`` utilities for ATLAS-netcdf tidal models """ def __init__(self, dtree): self._dtree = dtree
[docs] def to_netcdf( self, grid_file: str | pathlib.Path, directory: str | pathlib.Path | None = None, **kwargs, ): """ Writes netCDF4 files in ATLAS format Parameters ---------- grid_file: str or pathlib.Path Output ATLAS-netcdf grid file directory: str or pathlib.Path Output directory for ATLAS-netcdf files kwargs: dict Additional keyword arguments for netCDF4 writer """ # tilde-expand grid file grid_file = pyTMD.utilities.Path(grid_file).resolve() # set default output directory directory = grid_file.parent if directory is None else directory # for each model group for group in ("z", "u", "v"): # get xarray dataset for group ds = self._dtree[group].to_dataset() # write in append mode to add group to same grid and directory # output grid file ATLASDataset(ds).to_grid(grid_file, group=group, mode="a", **kwargs) # output constituent files ATLASDataset(ds).to_netcdf( directory, group=group, mode="a", **kwargs )