#!/usr/bin/env python
"""
interpolate.py
Written by Tyler Sutterley (05/2026)
Interpolators for spatial data
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
scipy: Scientific Tools for Python
https://docs.scipy.org/doc/
xarray: N-D labeled arrays and datasets in Python
https://docs.xarray.dev/en/stable/
UPDATE HISTORY:
Updated 05/2026: added parameters to allow for extrapolation with
inverse distance weighting (IDW) in addition to nearest-neighbors (NN)
added worker parameter to allow for parallel KD-tree queries
added support for geographic coordinates in inpainting
Updated 04/2026: add 1st and 2nd order barycentric interpolation function
add winding number to check elements that may cross a meridian line
Updated 03/2026: break up extrapolation into separate functions to allow
for caching of the kd-tree when interpolating multiple variables
Updated 02/2026: output data from extrapolate as an xarray DataArray
where there are no valid points within the cutoff distance
Updated 01/2026: output data from extrapolate as an xarray DataArray
Updated 11/2025: calculate lambda function after nearest-neighbors
set default data type for interpolation functions as input data type
generalize vectorized 1D linear interpolation for more cases of fp
allow iterating over variable with a recursion in interp1d
drop prior interpolation functions to prefer xarray internals
Updated 08/2025: added vectorized 1D linear interpolation function
improve performance of bilinear interpolation and allow extrapolation
added a penalized least square inpainting function to gap fill data
standardized most variable names between interpolation functions
Updated 09/2024: deprecation fix case where an array is output to scalars
Updated 07/2024: changed projection flag in extrapolation to is_geographic
Written 12/2022
"""
from __future__ import annotations
import numpy as np
import xarray as xr
import scipy.fftpack
import scipy.spatial
import pyTMD.spatial
__all__ = [
"interp1d",
"inpaint",
"extrapolate",
"_to_cartesian",
"_build_tree",
"_query_tree",
"barycentric",
"_to_barycentric",
"_inside_triangle",
"_shape_functions",
"_winding_number",
]
# PURPOSE: 1-dimensional linear interpolation on arrays
[docs]
def interp1d(
x: float | np.ndarray,
xp: np.ndarray,
fp: np.ndarray,
**kwargs,
):
"""
Vectorized one-dimensional linear interpolation
Parameters
----------
x: float | np.ndarray
x-coordinate(s) of the interpolated values
xp: np.ndarray
x-coordinates of the data points
fp: np.ndarray
y-coordinates of the data points
extrapolate: str, default = 'linear'
Method of extrapolation
- ``'linear'``
- ``'nearest'``
Returns
-------
f: np.ndarray
Interpolated values at new coordinates
"""
# get extrapolation method
extrapolate = kwargs.get("extrapolate", "linear").lower()
if extrapolate not in ("linear", "nearest"):
raise ValueError(f"Invalid extrapolate method: {extrapolate}")
# recursion for multiple x values
if isinstance(x, np.ndarray) and (x.ndim > 0):
# allocate output array
f = np.zeros((*fp.shape[:-1], len(x)), dtype=fp.dtype)
for i, val in enumerate(x):
f[..., i] = interp1d(val, xp, fp, **kwargs)
# return the array of interpolated values
return f
# clip coordinates to handle nearest-neighbor extrapolation
if extrapolate == "nearest":
x = np.clip(x, a_min=xp.min(), a_max=xp.max())
# find indice where x could be inserted into xp
j = np.searchsorted(xp, x) - 1
# clip indices to handle linear extrapolation
if extrapolate == "linear":
j = np.clip(j, a_min=0, a_max=len(xp) - 2)
# fractional distance between points
d = np.divide(x - xp[j], xp[j + 1] - xp[j])
# calculate interpolated values
f = (1.0 - d) * fp[..., j] + d * fp[..., j + 1]
# return the interpolated values
return f
[docs]
def inpaint(
xs: np.ndarray,
ys: np.ndarray,
zs: np.ndarray,
N: int = 0,
s0: int = 3,
power: int = 2,
epsilon: float = 2.0,
is_geographic: bool = False,
**kwargs,
):
"""
Inpaint over missing data in a two-dimensional array using a
penalized least-squares method based on discrete cosine transforms
:cite:p:`Garcia:2010hn,Wang:2012ei`
Parameters
----------
xs: np.ndarray
x-coordinates
ys: np.ndarray
y-coordinates
zs: np.ndarray
Data with masked values
N: int, default 0
Number of iterations (0 for nearest neighbors)
s0: int, default 3
Smoothing factor
power: int, default 2
Power for lambda function
epsilon: float, default 2.0
Relaxation factor
is_geographic: bool, default False
Input grid is in geographic coordinates
workers: int, default 1
Number of parallel workers for KD-tree query
Returns
-------
z0: np.ndarray
Data with inpainted (filled) values
"""
# set number of parallel workers for KD-tree query
workers = kwargs.get("workers", 1)
# find masked values
if isinstance(zs, np.ma.MaskedArray):
W = np.logical_not(zs.mask)
else:
W = np.isfinite(zs)
# no valid values can be found
if not np.any(W):
raise ValueError("No valid values found")
# dimensions of input grid
ny, nx = np.shape(zs)
# mask of invalid values
masked = np.logical_not(W)
# convert to Cartesian coordinates for distance calculations
xgrid, ygrid = np.meshgrid(xs, ys)
p_in = _to_cartesian(xgrid[W], ygrid[W], is_geographic=is_geographic)
p_out = _to_cartesian(
xgrid[masked], ygrid[masked], is_geographic=is_geographic
)
# calculate initial values using nearest neighbors
# computation of distance Matrix
# use scipy spatial KDTree routines
tree = _build_tree(p_in)
# find nearest neighbors
_, ii = tree.query(p_out, k=1, workers=workers)
# copy valid original values
z0 = np.zeros((ny, nx), dtype=zs.dtype)
z0[W] = np.copy(zs[W])
# copy nearest neighbors
z0[masked] = zs[W][ii]
# return nearest neighbors interpolation
if N == 0:
return z0
# copy data to new array with 0 values for mask
ZI = np.zeros((ny, nx), dtype=zs.dtype)
ZI[W] = np.copy(z0[W])
# calculate lambda function
L = np.zeros((ny, nx))
L += np.broadcast_to(np.cos(np.pi * np.arange(ny) / ny)[:, None], (ny, nx))
L += np.broadcast_to(np.cos(np.pi * np.arange(nx) / nx)[None, :], (ny, nx))
LAMBDA = np.power(2.0 * (2.0 - L), power)
# smoothness parameters
s = np.logspace(s0, -6, N)
for i in range(N):
# calculate discrete cosine transform
GAMMA = 1.0 / (1.0 + s[i] * LAMBDA)
DISCOS = GAMMA * scipy.fftpack.dctn(W * (ZI - z0) + z0, norm="ortho")
# update interpolated grid
z0 = (
epsilon * scipy.fftpack.idctn(DISCOS, norm="ortho")
+ (1.0 - epsilon) * z0
)
# reset original values
z0[W] = np.copy(zs[W])
# return the inpainted grid
return z0
# PURPOSE: extrapolate valid data to output points
[docs]
def _to_cartesian(
x: np.ndarray,
y: np.ndarray,
is_geographic: bool = True,
):
"""
Convert input coordinates to an array of points in a
Cartesian coordinate system
Parameters
----------
x: np.ndarray
x-coordinates to be converted
y: np.ndarray
y-coordinates to be converted
is_geographic: bool, default True
Coordinates are geographic
Returns
-------
points: np.ndarray
Output points in Cartesian coordinates
"""
# verify output dimensions
x = np.atleast_1d(x)
y = np.atleast_1d(y)
# calculate coordinates to query for neighboring points
if is_geographic:
# global or regional equirectangular model
# ellipsoidal major axis in kilometers
a_axis = 6378.137
# calculate Cartesian coordinates of input grid
xi, yi, zi = pyTMD.spatial.to_cartesian(x, y, a_axis=a_axis)
# calculate Cartesian coordinates of output coordinates
points = np.c_[xi, yi, zi]
else:
points = np.c_[x, y]
# return the output points in Cartesian coordinates
return points
[docs]
def _build_tree(points: np.ndarray, **kwargs):
"""
Build a KD-tree to search for neighboring points
Parameters
----------
points: np.ndarray
Input points in Cartesian coordinates
kwargs: dict
Additional keyword arguments for ``scipy.spatial.KDTree``
Returns
-------
tree: scipy.spatial.KDTree
KD-tree from input points
"""
# create KD-tree of points for nearest-neighbor extrapolation
tree = scipy.spatial.KDTree(points, **kwargs)
return tree
[docs]
def _query_tree(
tree: scipy.spatial.KDTree,
points: np.ndarray,
flattened: np.ndarray,
k: int = 1,
power: int = 2,
cutoff: int | float = np.inf,
fill_value: float = None,
**kwargs,
):
"""
Extrapolation of valid model data using KD-trees using
nearest-neighbor (NN) or inverse distance weighting (IDW)
Parameters
----------
tree: scipy.spatial.KDTree
KD-tree of valid points to query
points: np.ndarray
Output points in Cartesian coordinates
flattened: np.ndarray
Valid data array to be extrapolated
k: int, default 1
Number of nearest neighbors to use for extrapolation
power: int, default 2
Power for inverse distance weighting (IDW) extrapolation
cutoff: float, default np.inf
Return only neighbors within distance (kilometers)
fill_value: float, default None
Invalid value
dtype: np.dtype, default from input data
Output data type
workers: int, default 1
Number of parallel workers to use for KD-tree query
Set to ``-1`` to use all available cores
Returns
-------
data: xr.DataArray
Extrapolated data
"""
# set default data type
dtype = kwargs.get("dtype", flattened.dtype)
workers = kwargs.get("workers", 1)
# number of data points
npts, _ = points.shape
# query output data points and find k nearest neighbor within cutoff
dd, ii = tree.query(
points, k=k, distance_upper_bound=cutoff, workers=workers
)
# allocate to output extrapolate data array
data = np.ma.zeros((npts), dtype=dtype, fill_value=fill_value)
data.mask = np.ones((npts), dtype=bool)
# initially set all data to fill value
data.data[:] = data.fill_value
# spatially extrapolate using nearest neighbors or IDW
if k == 1 and np.any(np.isfinite(dd)):
# spatially extrapolate using nearest neighbors
(ind,) = np.nonzero(np.isfinite(dd))
data.data[ind] = flattened[ii[ind]]
data.mask[ind] = False
elif k > 1 and np.any(np.isfinite(dd)):
# clip distances to handle cases where points overlap
# this can lead to infinite weights in the IDW extrapolation
dd = np.clip(dd, a_min=1e-10, a_max=None)
# clip indices to handle cases where there are fewer than k neighbors
# distances will be inf so these points will be masked in the output
ii = np.clip(ii, a_min=0, a_max=len(flattened) - 1)
# calculate inverse distance weights for power
power_inverse_distance = np.power(dd, -power)
# normalize weights if power is greater than 0 (usually between 1 and 3)
# for power = 0, all weights are 1 and the extrapolation is an average
s = np.nansum(power_inverse_distance, axis=1)
w = power_inverse_distance / np.broadcast_to(s[:, None], (npts, k))
# spatially extrapolate using inverse distance weighting
data.data[:] = np.nansum(w * flattened[ii], axis=1)
data.mask[:] = np.logical_not(np.isfinite(dd).any(axis=1))
# return extrapolated values
return xr.DataArray(data)
[docs]
def barycentric(
xv: np.ndarray,
yv: np.ndarray,
ze: np.ndarray,
x: np.ndarray,
y: np.ndarray,
order: int = 1,
**kwargs,
):
"""
Interpolation of unstructured model data using a barycentric
method
Parameters
----------
xv: np.ndarray
x-coordinates of triangle vertices
yv: np.ndarray
y-coordinates of triangle vertices
ze: np.ndarray
Unstructured model data at elements
x: np.ndarray
Output x-coordinates
y: np.ndarray
Output y-coordinates
order: int, default 1
Polynomial order of the triangular elements
- ``1``: linear
- ``2``: quadratic
Returns
-------
data: xr.DataArray
Interpolated data
"""
# set default data type
dtype = kwargs.get("dtype", ze.dtype)
# convert to barycentric coordinates
xi, eta = _to_barycentric(xv, yv, x, y)
# check if inside polygon
valid = _inside_triangle(xi, eta)
# allocate to output extrapolate data array
data = np.zeros_like(x, dtype=dtype)
if not np.any(valid):
return xr.DataArray(data)
# get shape functions for order
N = _shape_functions(xi, eta, order)
# calculate interpolation
for p, sf in enumerate(N):
data += sf * valid * ze[..., p]
# return the interpolated value
return xr.DataArray(data)
[docs]
def _to_barycentric(
xv: np.ndarray,
yv: np.ndarray,
x: np.ndarray,
y: np.ndarray,
):
"""
Convert coordinates to barycentric space
Parameters
----------
xv: np.ndarray
x-coordinates of triangle vertices
yv: np.ndarray
y-coordinates of triangle vertices
x: np.ndarray
Output x-coordinates
y: np.ndarray
Output y-coordinates
Returns
-------
xi: np.ndarray
Normalized barycentric (areal) xi-coordinates
eta: np.ndarray
Normalized barycentric (areal) eta-coordinates
"""
# calculate triangle area
A = 0.5 * (
xv[0] * (yv[1] - yv[2])
+ xv[1] * (yv[2] - yv[0])
+ xv[2] * (yv[0] - yv[1])
)
# calculate Jacobian
# ignore divide by zero and invalid value warnings
with np.errstate(divide="ignore", invalid="ignore"):
J = 1.0 / (2.0 * A)
# mapping into barycentric coordinates
xi = J * (
(xv[1] * yv[2] - xv[2] * yv[1])
+ (yv[1] - yv[2]) * x
+ (xv[2] - xv[1]) * y
)
eta = J * (
(xv[2] * yv[0] - xv[0] * yv[2])
+ (yv[2] - yv[0]) * x
+ (xv[0] - xv[2]) * y
)
# return the barycentric coordinates
return xi, eta
[docs]
def _inside_triangle(
xi: np.ndarray,
eta: np.ndarray,
atol: float = 1e-8,
):
"""
Check if point is within the triangular area
Parameters
----------
xi: np.ndarray
Normalized barycentric (areal) xi-coordinates
eta: np.ndarray
Normalized barycentric (areal) eta-coordinates
atol: float = 1e-8
Absolute tolerance parameter
Returns
-------
valid: np.ndarray
Mask for coordinates
"""
# simple check to see if areas are valid
la = 1.0 - eta - xi
# all barycentric coordinates should be within 0 to 1
# and have valid Jacobians (not dividing by 0)
valid = (
(np.isfinite(xi) & np.isfinite(eta))
& (la >= (0.0 - atol))
& (la <= (1.0 + atol))
& (xi >= (0.0 - atol))
& (xi <= (1.0 + atol))
& (eta >= (0.0 - atol))
& (eta <= (1.0 + atol))
)
return valid
[docs]
def _shape_functions(xi: np.ndarray, eta: np.ndarray, order: int):
"""
Get the interpolating shape functions for a polynomial order
Parameters
----------
xi: np.ndarray
Normalized barycentric (areal) xi-coordinates
eta: np.ndarray
Normalized barycentric (areal) eta-coordinates
order: int
Polynomial order of the triangular elements
- ``1``: linear
- ``2``: quadratic
Returns
-------
N: list
Shape functions in barycentric space
"""
# shape functions in barycentric space
N = [None] * (3 * order)
if order == 1:
# 1st order terms: linear triangular elements
N[0] = xi
N[1] = eta
N[2] = 1.0 - eta - xi
elif order == 2:
# 2nd order terms: quadratic triangular elements
N[0] = xi * (2.0 * xi - 1.0)
N[1] = 4.0 * xi * eta
N[2] = eta * (2.0 * eta - 1.0)
N[3] = 4.0 * eta * (1.0 - xi - eta)
N[4] = (1.0 - xi - eta) * (1.0 - 2.0 * xi - 2.0 * eta)
N[5] = 4.0 * xi * (1.0 - xi - eta)
else:
raise ValueError(f"Unsupported polynomial order {order}")
# return the shape functions
return N
# PURPOSE: determine if triangle winding is counter-clockwise
[docs]
def _winding_number(xv: np.ndarray, yv: np.ndarray):
"""
Calculate the winding number of a triangle by taking the
cross-product of the vertex vectors
Parameters
----------
xv: np.ndarray
x-coordinates of triangle vertices
yv: np.ndarray
y-coordinates of triangle vertices
Returns
-------
wind: np.ndarray
Winding number of the triangle
"""
wind = (xv[1] - xv[0]) * (yv[2] - yv[0]) - (yv[1] - yv[0]) * (xv[2] - xv[0])
return wind