from typing import Union, List, Any
from numpy.typing import ArrayLike
import numpy as np
from pyinterpolate.core.data_models.points import VariogramPoints
from pyinterpolate.core.validators.experimental_semivariance import \
validate_bins, validate_direction_and_tolerance
from pyinterpolate.semivariogram.experimental.functions.covariance import \
covariance_fn
from pyinterpolate.semivariogram.experimental.functions.directional import \
from_ellipse
from pyinterpolate.semivariogram.experimental.functions.general import \
_omnidirectional_variogram
from pyinterpolate.semivariogram.lags.lags import get_lags
[docs]
def calculate_covariance(ds: Union[ArrayLike, VariogramPoints] = None,
values: ArrayLike = None,
geometries: ArrayLike = None,
step_size: float = None,
max_range: float = None,
direction: float = None,
tolerance: float = None,
custom_bins: Union[Any, np.ndarray] = None
) -> np.ndarray:
"""
Calculates experimental covariance.
Parameters
----------
ds : ArrayLike, optional
``[x, y, value]``
values : ArrayLike, optional
Observation in the i-th geometry (from ``geometries``). Optional
parameter, if not given then ``ds`` must be provided.
geometries : ArrayLike, optional
Array or similar structure with geometries. It must have the same
length as ``values``. Optional parameter, if not given then ``ds``
must be provided. Point type geometry.
step_size : float
The fixed distance between lags grouping point neighbors.
max_range : float
The maximum distance at which the covariance is calculated.
direction : float, optional
Direction of covariogram, values from 0 to 360 degrees:
- 0 or 180: is E-W,
- 90 or 270 is N-S,
- 45 or 225 is NE-SW,
- 135 or 315 is NW-SE.
tolerance : float, optional
If ``tolerance`` is 0 then points must be placed at a single line with
the beginning in the origin of the coordinate system and the
direction given by y axis and direction parameter.
If ``tolerance`` is ``> 0`` then the bin is selected as an elliptical
area with major axis pointed in the same direction as the line for
``0`` tolerance.
* The major axis size == ``step_size``.
* The minor axis size is ``tolerance * step_size``
* The baseline point is at a center of the ellipse.
* The ``tolerance == 1`` creates an omnidirectional covariogram.
custom_bins : numpy array, optional
Custom bins for covariance calculation. If provided, then parameter
``step_size`` is ignored and ``max_range`` is set to the final bin
distance.
Returns
-------
covariance : numpy array
``[lag, covariance, number of point pairs]``
Notes
-----
# Covariance
It is a measure of similarity between points over different distances.
We assume that the close observations tend to be similar
(recall the Tobler's Law).
We calculate the empirical covariance as:
.. math:: covariance = 1 / (N) * SUM(i=1, N) [z(x_i + h) * z(x_i)] - u^2
where:
- :math:`N` - number of observation pairs,
- :math:`h` - distance (lag),
- :math:`z(x_i)` - value at location :math:`z_i`,
- :math:`(x_i + h)` - location at a distance :math:`h` from :math:`x_i`,
- :math:`u` - average value of observations at a given lag distance.
As an output we get array of lags :math:`h`, covariances :math:`c` and
number of points within each lag :math:`n`.
# Directional Covariogram
The assumption that our observations change in the same way in every
direction is rarely true. Let's consider temperature. It changes from
the equator to the poles so in the N-S and S-N axes. The good idea is to
test the correlation of our observations in a few different directions.
The main difference between an omnidirectional covariogram and
a directional covariogram is that we take into account a different
subset of neighbors:
- Omnidirectional covariogram: we test neighbors in a circle,
- Directional covariogram: we test neighbors within an ellipse,
and one direction is major.
Examples
--------
>>> import numpy as np
>>> REFERENCE_INPUT = np.array([
... [0, 0, 8],
... [1, 0, 6],
... [2, 0, 4],
... [3, 0, 3],
... [4, 0, 6],
... [5, 0, 5],
... [6, 0, 7],
... [7, 0, 2],
... [8, 0, 8],
... [9, 0, 9],
... [10, 0, 5],
... [11, 0, 6],
... [12, 0, 3]
... ])
>>> STEP_SIZE = 1
>>> MAX_RANGE = 4
>>> covariances = calculate_covariance(
... values=REFERENCE_INPUT[:, -1],
... geometries=REFERENCE_INPUT[:, :-1],
... step_size=STEP_SIZE,
... max_range=MAX_RANGE
... )
>>> print(covariances[0][0])
[ 1. -0.54340278 24. ]
>>> print(covariances[1])
4.2485207100591715
"""
# Validate points
if not isinstance(ds, VariogramPoints):
ds = VariogramPoints(points=ds,
geometries=geometries,
values=values)
# Validation
# Validate points
if not isinstance(ds, VariogramPoints):
ds = VariogramPoints(points=ds)
# Validate bins
validate_bins(step_size, max_range, custom_bins)
# Validate direction and tolerance
validate_direction_and_tolerance(direction, tolerance)
# Calculations
# Get lags
lags = get_lags(step_size, max_range, custom_bins)
# Get covariances
if direction is not None and tolerance is not None:
experimental_covariances = _directional_covariance(
ds.points,
lags,
direction,
tolerance
)
else:
experimental_covariances = _omnidirectional_covariance(
ds.points, lags
)
return experimental_covariances
def _directional_covariance(points: np.ndarray,
lags: Union[List, np.ndarray],
direction: float,
tolerance: float):
"""
Function calculates directional covariances.
Parameters
----------
points : numpy array
``[x, y, value]``
lags : numpy array
direction : float, optional
Direction of semivariogram, values from 0 to 360 degrees:
- 0 or 180: is E-W,
- 90 or 270 is N-S,
- 45 or 225 is NE-SW,
- 135 or 315 is NW-SE.
tolerance : float, optional
If ``tolerance`` is 0 then points must be placed at a single line with
the beginning in the origin of the coordinate system and the
direction given by y axis and direction parameter.
If ``tolerance`` is ``> 0`` then the bin is selected as an elliptical
area with major axis pointed in the same direction as the line for
``0`` tolerance.
* The major axis size == ``step_size``.
* The minor axis size is ``tolerance * step_size``
* The baseline point is at a center of the ellipse.
* The ``tolerance == 1`` creates an omnidirectional semivariogram.
Returns
-------
: (numpy array)
``[lag, covariance, number of point pairs]``
"""
output_covariances = from_ellipse(covariance_fn,
points,
lags,
direction,
tolerance)
return output_covariances
def _omnidirectional_covariance(points: np.array, lags: np.array) -> np.array:
"""Function calculates covariance from given points.
Parameters
----------
points : numpy array
lags : Iterable
Returns
-------
covariances : numpy array
"""
sorted_covariances = _omnidirectional_variogram(
fn=covariance_fn,
points=points,
lags=lags,
custom_weights=None
)
return sorted_covariances