Source code for pyinterpolate.semivariogram.indicator.indicator

from copy import deepcopy
from typing import Union, List, Any
from numpy.typing import ArrayLike

import numpy as np
from matplotlib import pyplot as plt
from matplotlib import markers
from tqdm import tqdm

from pyinterpolate.core.data_models.points import VariogramPoints
from pyinterpolate.semivariogram.experimental.classes.experimental_variogram import ExperimentalVariogram
from pyinterpolate.semivariogram.theoretical.classes.theoretical_variogram import TheoreticalVariogram


def code_indicators(thresholds: ArrayLike,
                    ds: Union[ArrayLike, VariogramPoints] = None,
                    values: ArrayLike = None,
                    geometries: ArrayLike = None) -> np.ndarray:
    """
    Function transforms kriging values into a vector of their indicators.

    Parameters
    ----------
    thresholds : List
        The list of possible thresholds.

    ds : ArrayLike, optional
        ``[x, y, value]``

    values : ArrayLike, optional
        Observation in the i-th geometry (from ``geometries``). Optional
        parameter, if not given then ``ds`` must be provided.

    geometries : ArrayLike, optional
        Array or similar structure with geometries. It must have the same
        length as ``values``. Optional parameter, if not given then ``ds``
        must be provided. Point type geometry.

    Returns
    -------
    ids : numpy array
        [lon, lat, bin value for thresh 0, ..., bin value for thresh n].

    """

    ids = []
    thresh_arr = np.array(thresholds)

    # Validate points
    if not isinstance(ds, VariogramPoints):
        ds = VariogramPoints(points=ds,
                             geometries=geometries,
                             values=values)
        ds = ds.points

    for row in ds:
        _r = [row[0], row[1]]
        _val = row[2]
        _indicators = _val <= thresh_arr
        _r.extend(_indicators.astype(int))
        ids.append(_r)

    ids = np.array(ids)
    return ids


def select_variogram_thresholds(ds: ArrayLike,
                                n_thresh: int) -> List[float]:
    """
    Function selects ``n_thresh`` thresholds of a sample dataset from
    its histogram, it divides histogram based on the n-quantiles.

    Parameters
    ----------
    ds : Iterable
        Data values used for interpolation.

    n_thresh : int
        The number of thresholds.

    Returns
    -------
    thresholds : List
        Thresholds used for indicator Kriging.
    """

    quantiles = np.linspace(0, 1, n_thresh+1)

    thresholds = [np.quantile(ds, q=q) for q in quantiles[1:]]

    return thresholds


class IndicatorVariogramData:
    """
    Class describes indicator variogram data.

    Parameters
    ----------
    number_of_thresholds: int
        The number of thresholds to model data.

    ds : ArrayLike, optional
        ``[x, y, value]``

    values : ArrayLike, optional
        Observation in the i-th geometry (from ``geometries``). Optional
        parameter, if not given then ``ds`` must be provided.

    geometries : ArrayLike, optional
        Array or similar structure with geometries. It must have the same
        length as ``values``. Optional parameter, if not given then ``ds``
        must be provided. Point type geometry.

    Attributes
    ----------
    input_array : numpy array, list, tuple
        Coordinates and their values: ``(pt x, pt y, value)``

    n_thresholds: int
        The number of thresholds to model data.

    thresholds : numpy array
        The 1D numpy array with thresholds.

    ids : numpy array
        The numpy array with
        ``[coordinate_x, coordinate_y, threshold_0, ..., threshold_n]``.

    See Also
    --------
    ExperimentalIndicatorVariogram
        Class that calculates experimental variograms for each indicator.

    """

    def __init__(self,
                 number_of_thresholds: int,
                 ds: Union[ArrayLike, VariogramPoints] = None,
                 values: ArrayLike = None,
                 geometries: ArrayLike = None):

        if not isinstance(ds, VariogramPoints):
            ds = VariogramPoints(points=ds,
                                 geometries=geometries,
                                 values=values)
            ds = ds.points
        else:
            ds = ds.points

        self.input_array = ds
        self.n_thresholds = number_of_thresholds
        self.thresholds = select_variogram_thresholds(ds[:, -1],
                                                      self.n_thresholds)
        self.ids = code_indicators(self.thresholds, ds=ds)


[docs] class ExperimentalIndicatorVariogram: """ Class describes Experimental Indicator Variogram models. Parameters ---------- number_of_thresholds: int The number of thresholds to model data. step_size : float The distance between lags within each points are included in the calculations. max_range : float The maximum range of analysis. ds : ArrayLike, optional ``[x, y, value]`` values : ArrayLike, optional Observation in the i-th geometry (from ``geometries``). Optional parameter, if not given then ``ds`` must be provided. geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``values``. Optional parameter, if not given then ``ds`` must be provided. Point type geometry. custom_weights : numpy array, default=None Weights assigned to points, index of weight must be the same as index of point. direction : float (in range [0, 360]), default=None Direction of semivariogram, values from 0 to 360 degrees: - 0 or 180: is E-W, - 90 or 270 is N-S, - 45 or 225 is NE-SW, - 135 or 315 is NW-SE. tolerance : float (in range [0, 1]), default=1 If ``tolerance`` is 0 then points must be placed at a single line with the beginning in the origin of the coordinate system and the direction given by y axis and direction parameter. If ``tolerance`` is ``> 0`` then the bin is selected as an elliptical area with major axis pointed in the same direction as the line for 0 tolerance. * The major axis size == ``step_size``. * The minor axis size is ``tolerance * step_size`` * The baseline point is at a center of the ellipse. * The ``tolerance == 1`` creates an omnidirectional semivariogram. fit : bool, default = True Should models be fitted in the class initialization? Attributes ---------- ds : IndicatorVariogramData Prepared indicator data. step_size : float Derived from the ``step_size`` parameter. max_range : float Derived from the ``max_range`` parameter. custom_weights : numpy array Derived from the ``weights`` parameter. direction : float Derived from the ``direction`` parameter. tolerance : float Derived from the ``tolerance`` parameter. experimental_models : List The ``[threshold, experimental_variogram]`` pairs. Methods ------- fit() Fits indicators to experimental variograms. show() Show experimental variograms for each indicator. References ---------- Goovaerts P. AUTO-IK: a 2D indicator kriging program for automated non-parametric modeling of local uncertainty in earth sciences. DOI: https://doi.org/10.1016/j.cageo.2008.08.014 Examples -------- >>> import numpy as np >>> from pyinterpolate import ExperimentalIndicatorVariogram >>> >>> >>> dem = np.random.random(size=(1000, 3)) >>> step_size = 0.1 >>> max_range = 0.6 >>> exp_indicator = ExperimentalIndicatorVariogram( ... values=dem[:, -1], ... geometries=dem[:, :-1], ... number_of_thresholds=5, ... step_size=step_size, ... max_range=max_range ... ) >>> print(len(exp_indicator.experimental_models)) 5 >>> exp_indicator.show() # plots variogram for each threshold """ def __init__(self, number_of_thresholds: int, step_size: float, max_range: float, ds: Union[ArrayLike, VariogramPoints] = None, values: ArrayLike = None, geometries: ArrayLike = None, custom_weights=None, custom_bins=None, direction: float = None, tolerance: float = None, fit=True): self.ds = IndicatorVariogramData( ds=ds, values=values, geometries=geometries, number_of_thresholds=number_of_thresholds ) self.step_size = step_size self.max_range = max_range self.custom_weights = custom_weights self.custom_bins = custom_bins self.direction = direction self.tolerance = tolerance self.experimental_models = {} if fit: self.fit()
[docs] def fit(self): """ Function fits indicators to models and updates class models. """ for idx, indicator in enumerate(tqdm(self.ds.thresholds)): _index = 2 + idx exp = ExperimentalVariogram( ds=self.ds.ids[:, [0, 1, _index]], step_size=self.step_size, max_range=self.max_range, custom_weights=self.custom_weights, custom_bins=self.custom_bins, direction=self.direction, tolerance=self.tolerance, is_semivariance=True, is_covariance=True ) self.experimental_models[str(indicator)] = exp
[docs] def show(self): """ Function shows generated experimental variograms for each indicator. """ legend = [] plt.figure(figsize=(12, 6)) for idx, rec in self.experimental_models.items(): rec: ExperimentalVariogram plt.scatter(rec.lags, rec.semivariances) legend.append(f'{float(idx):.2f}') plt.legend(legend) plt.xlabel('Distance') plt.ylabel('Semivariance') plt.show()
[docs] class TheoreticalIndicatorVariogram: """ Class models indicator variograms for all indices. Parameters ---------- experimental_indicator_variogram : ExperimentalIndicatorVariogram Fitted experimanetal variograms with indicators for each threshold. Attributes ---------- experimental_indicator_variogram : ExperimentalIndicatorVariogram See ``experimental_indicator_variogram`` parameter. theoretical_indicator_variograms : Dict Dictionary with fitted theoretical models for each threshold. Methods ------- fit() Fits theoretical models to experimental variograms. show() Shows experimental and theoretical variograms for each threshold. Examples -------- >>> import numpy as np >>> from pyinterpolate import ( ... ExperimentalIndicatorVariogram, ... TheoreticalIndicatorVariogram ... ) >>> >>> >>> dem = np.random.random(size=(1000, 3)) >>> step_size = 0.1 >>> max_range = 0.6 >>> exp_indicator = ExperimentalIndicatorVariogram( ... values=dem[:, -1], ... geometries=dem[:, :-1], ... number_of_thresholds=5, ... step_size=step_size, ... max_range=max_range ... ) >>> theo_indicator = TheoreticalIndicatorVariogram( ... experimental_indicator_variogram=exp_indicator ... ) >>> theo_indicator.fit() >>> print(len(theo_indicator.theoretical_indicator_variograms)) 5 >>> theo_indicator.show() # plots theoretical models' curves """ def __init__( self, experimental_indicator_variogram: ExperimentalIndicatorVariogram ): self.experimental_indicator_variogram = experimental_indicator_variogram self.theoretical_indicator_variograms = {}
[docs] def fit(self, models_group: Union[str, list] = 'safe', nugget=None, min_nugget=0, max_nugget=0.5, number_of_nuggets=16, rang=None, min_range=0.1, max_range=0.5, number_of_ranges=16, sill=None, n_sill_values=5, sill_from_variance=False, min_sill=0.5, max_sill=2, number_of_sills=16, direction=None, error_estimator='rmse', deviation_weighting='equal'): """ Method tries to find the optimal range, sill and model (function) of the theoretical semivariogram. Parameters ---------- models_group : str or list, default='safe' Models group to test: - 'all' - the same as list with all models, - 'safe' - ['linear', 'power', 'spherical'] - as a list: multiple model types to test - as a single model type from: - 'circular', - 'cubic', - 'exponential', - 'gaussian', - 'linear', - 'power', - 'spherical'. nugget : float, optional Nugget (bias) of a variogram. If given then it is fixed to this value. min_nugget : float, default = 0 The minimum nugget as the ratio of the parameter to the first lag variance. max_nugget : float, default = 0.5 The maximum nugget as the ratio of the parameter to the first lag variance. number_of_nuggets : int, default = 16 How many equally spaced nuggets tested between ``min_nugget`` and ``max_nugget``. rang : float, optional If given, then range is fixed to this value. min_range : float, default = 0.1 The minimal fraction of a variogram range, ``0 < min_range <= max_range``. max_range : float, default = 0.5 The maximum fraction of a variogram range, ``min_range <= max_range <= 1``. Parameter ``max_range`` greater than **0.5** raises warning. number_of_ranges : int, default = 16 How many bins are tested between ``min_range`` and ``max_range``. sill : float, default = None Partial sill, or sill when nugget is set to zero. Total sill is a sum of partial sill and nugget. If given, then partial sill is fixed to this value. n_sill_values : int, default = 5 The last n experimental semivariance records for sill estimation. (Used only when ``sill_from_variance`` is set to ``False``). sill_from_variance : bool, default = False Estimate sill from the variance (semivariance at distance 0). min_sill : float, default = 0.5 The minimal fraction of the variogram variance at lag 0 to find partial sill, ``0 <= min_sill <= max_sill``. max_sill : float, default = 2 The maximum fraction of the variogram variance at lag 0 to find partial sill. number_of_sills : int, default = 16 How many equally spaced sill values are tested between ``min_sill`` and ``max_sill``. direction : float, in range [0, 360], default=None The direction of a semivariogram. If ``None`` given then semivariogram is isotropic. This parameter is required if passed experimental variogram is stored in a numpy array. error_estimator : str, default = 'rmse' A model error estimation method. Available options are: - 'rmse': Root Mean Squared Error, - 'mae': Mean Absolute Error, - 'bias': Forecast Bias, - 'smape': Symmetric Mean Absolute Percentage Error. deviation_weighting : str, default = "equal" The name of the method used to weight error at a given lags. Works only with RMSE. Available methods: - equal: no weighting, - closest: lags at a close range have bigger weights, - distant: lags that are further away have bigger weights, - dense: error is weighted by the number of point pairs within lag. """ experimental_models = self.experimental_indicator_variogram.experimental_models for idx, experimental in experimental_models.items(): theo = TheoreticalVariogram() theo.autofit(experimental_variogram=experimental, models_group=models_group, nugget=nugget, min_nugget=min_nugget, max_nugget=max_nugget, number_of_nuggets=number_of_nuggets, rang=rang, min_range=min_range, max_range=max_range, number_of_ranges=number_of_ranges, sill=sill, n_sill_values=n_sill_values, sill_from_variance=sill_from_variance, min_sill=min_sill, max_sill=max_sill, number_of_sills=number_of_sills, direction=direction, error_estimator=error_estimator, deviation_weighting=deviation_weighting, return_params=False) self.theoretical_indicator_variograms[idx] = theo
[docs] def show(self, subplots: bool = False): """ Method plots experimental and theoretical variograms. Parameters ---------- subplots : bool, default = False If ``True`` then each indicator variogram is plotted on a separate plot. Otherwise, all variograms are plotted on a single plot. """ if subplots: number_of_subplots = len( list(self.theoretical_indicator_variograms.keys()) ) fig, axes = plt.subplots(number_of_subplots, sharey=True, sharex=True, constrained_layout=True) axes: Any # type checker disabled idx_val = 0 for _key, _item in self.theoretical_indicator_variograms.items(): _item: TheoreticalVariogram axes[idx_val].scatter(_item.lags, _item.experimental_semivariances, marker='x', c='#101010', alpha=0.2) axes[idx_val].plot(_item.lags, _item.yhat, '--', color='#fc8d62') axes[idx_val].set_title('Threshold: ' + f'{float(_key):.2f}') idx_val = idx_val + 1 else: # Plot everything on a single plot plt.figure(figsize=(10, 10)) legend = [] markers_list = list( markers.MarkerStyle.filled_markers ) # Only filled colors_list = [ '#66c2a5', '#fc8d62', '#8da0cb' ] cidx = 0 mlist = deepcopy(markers_list) _marker_idx = 1 for _key, _item in self.theoretical_indicator_variograms.items(): _item: TheoreticalVariogram try: plt.scatter(_item.lags, _item.experimental_semivariances, marker=mlist.pop(0), alpha=0.4, c=colors_list[cidx], edgecolors='black') except IndexError: mlist = deepcopy(markers_list) cidx = cidx + 1 plt.scatter(_item.lags, _item.experimental_semivariances, marker=mlist.pop(0), alpha=0.4, c=colors_list[cidx], edgecolors='black') no_items = len(_item.lags) plt.annotate( str(_marker_idx), [ _item.lags[no_items - 1], _item.experimental_semivariances[no_items - 1] ] ) legend.append( str(_marker_idx) + ' | T: ' + f'{float(_key):.2f}' ) _marker_idx = _marker_idx + 1 plt.legend(legend) plt.show()