Source code for pyinterpolate.kriging.point.indicator

"""
Perform indicator kriging.

Authors
-------
1. Szymon Moliński | @SimonMolinsky

Bibliography
------------
[1] P. Goovaerts, AUTO-IK: A 2D indicator kriging program for the automated
    non-parametric modeling of local uncertainty in earth sciences,
    Computers & Geosciences, Volume 35, Issue 6, 2009, Pages 1255-1270,
    ISSN 0098-3004,
    https://doi.org/10.1016/j.cageo.2008.08.014.

Indicator kriging is performed for each threshold using four types of
destination geography:
# TODO: helper methods to perform operations below:
1) grid of points specified by the user,
2) rectangular grid,
3) sampled locations (cross-validation option), and
4) set of test locations (jack-knife option).
"""
from typing import Tuple, Dict
from numpy.typing import ArrayLike

import numpy as np
from tqdm import tqdm

from scipy.interpolate import UnivariateSpline

from pyinterpolate.kriging.point.ordinary import ordinary_kriging
from pyinterpolate.kriging.point.simple import simple_kriging
from pyinterpolate.semivariogram.indicator.indicator import TheoreticalIndicatorVariogram
from pyinterpolate.transform.geo import geometry_and_values_array


[docs] class IndicatorKriging: """ Class performs indicator kriging. Parameters ---------- indicator_variograms : IndicatorVariograms Modeled variograms for each threshold. unknown_locations : numpy ndarray Points where we want to estimate value ``(x, y) <-or-> (lon, lat)``. known_locations : numpy array, optional The known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. kriging_type : str, default = 'ok' Type of kriging to perform. Possible values: 'ok' - ordinary kriging, 'sk' - simple kriging. process_mean : float The mean value of a process over a study area. Should be known before processing. That's why Simple Kriging has a limited number of applications. You must have multiple samples and well-known area to know this parameter. neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then range is selected from the ``theoretical_model`` ``rang`` attribute. no_neighbors : int, default = 4 The number of the **n-closest neighbors** used for interpolation. use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` parameter then take all of them anyway. allow_approximate_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. get_expected_values : bool, default=True If ``True`` then expected values and variances are calculated. Attributes ---------- thresholds : array Thresholds used for indicator kriging. coordinates : array Coordinates of unknown locations. indicator_predictions : array Indicator kriging predictions for each threshold and each unknown location. expected_values : array Expected values derived from ``indicator_predictions`` for each unknown location. variances : array Variances derived from ``indicator_predictions`` for each unknown location. Methods ------- get_indicator_maps() Returns dictionary with thresholds and indicator maps for each of them. get_expected_values() Returns two arrays: one array with coordinates and expected values, and the second with coordinates and variances. Examples -------- >>> import numpy as np >>> from pyinterpolate.kriging.point.indicator import IndicatorKriging >>> from pyinterpolate.semivariogram.indicator.indicator import ( ... ExperimentalIndicatorVariogram, ... TheoreticalIndicatorVariogram ... ) >>> >>> >>> dem = np.random.random(size=(1000, 3)) >>> exp_variogram = ExperimentalIndicatorVariogram( ... values=dem[:, -1], ... geometries=dem[:, :-1], ... number_of_thresholds=3, ... step_size=0.1, ... max_range=0.6 ... ) >>> theo_variograms = TheoreticalIndicatorVariogram( ... experimental_indicator_variogram=exp_variogram ... ) >>> theo_variogram.fit() >>> ikriging = IndicatorKriging( ... known_values=dem[:, -1], ... known_geometries=dem[:, :-1], ... indicator_variograms=theo_variograms, ... unknown_locations=np.random.random(size=(100, 2)), ... kriging_type='ok', ... no_neighbors=16 ... ) >>> print(ikriging.indicator_predictions[0]) [0.12942075 0.13930802 0. ] >>> print(ikriging.expected_values[:3]) [0.64983095 0.64983095 0.64983095] >>> print(ikriging.variances[:3]) [0.28827397 0.28539667 0.30135499] >>> imaps = ikriging.get_indicator_maps() >>> print(imaps.keys()) dict_keys([0.35200453863293624, 0.6783974224309782, 0.9929787628823334]) """ def __init__(self, indicator_variograms: TheoreticalIndicatorVariogram, unknown_locations: ArrayLike, known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, kriging_type: str = 'ok', process_mean: float = None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False, allow_approximate_solutions=False, get_expected_values=True): if known_locations is None: known_locations = geometry_and_values_array( geometry=known_geometries, values=known_values ) self.thresholds = np.array( list( indicator_variograms.theoretical_indicator_variograms.keys() ) ).astype(float) self.coordinates = np.array(unknown_locations) self.indicator_predictions = self._estimate( known_locations, indicator_variograms, unknown_locations, kriging_type, process_mean, neighbors_range, no_neighbors, max_tick, use_all_neighbors_in_range, allow_approximate_solutions ) self.expected_values = None self.variances = None if get_expected_values: self.expected_values, self.variances = self._get_expected_values()
[docs] def get_indicator_maps(self) -> Dict: """ Method returns indicator map for each threshold. Returns ------- indicator_maps : Dict Indicator map for each threshold. """ indicator_maps = {} for idx, threshold in enumerate(self.thresholds): indicator_maps[threshold] = np.column_stack( [self.coordinates[:, 0], self.coordinates[:, 1], self.indicator_predictions[:, idx]] ) return indicator_maps
[docs] def get_expected_values_maps(self) -> Tuple[np.ndarray, np.ndarray]: """ Method returns expected values and variances for each threshold. Returns ------- expected_values, variances : numpy ndarray, numpy ndarray Expected values and variances. """ if self.expected_values is None: self._get_expected_values() # Expected values map expected_values_map = np.column_stack( [self.coordinates[:, 0], self.coordinates[:, 1], self.expected_values] ) # Variances map variances_map = np.column_stack( [self.coordinates[:, 0], self.coordinates[:, 1], self.variances] ) return expected_values_map, variances_map
def _estimate(self, known_locations, indicator_variograms, unknown_locations, kriging_type, process_mean, neighbors_range, no_neighbors, max_tick, use_all_neighbors_in_range=False, allow_approximate_solutions=False) -> np.ndarray: """ Method estimates probabilities of a location becoming within a given threshold. Parameters ---------- known_locations : numpy ndarray The known locations ``[x, y, value]``. indicator_variograms : IndicatorVariograms Modeled variograms for each threshold. unknown_locations : numpy ndarray Points where we want to estimate value ``(x, y) <-or-> (lon, lat)``. kriging_type : str, default = 'ok' Type of kriging to perform. Possible values: 'ok' - ordinary kriging, 'sk' - simple kriging. process_mean : float The mean value of a process over a study area. Should be known before processing. That's why Simple Kriging has a limited number of applications. You must have multiple samples and well-sampled area to know this parameter. neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then range is selected from the ``theoretical_model`` ``rang`` attribute. no_neighbors : int, default = 4 The number of the **n-closest neighbors** used for interpolation. use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` then take all of them anyway. allow_approximate_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. Returns ------- indicator_predictions : numpy array The indicator probabilities where each column is a threshold. Raises ------ ValueError Wrong Kriging type, allowed types: 'ok' or 'sk' """ predictions_all = [] d_items = indicator_variograms.theoretical_indicator_variograms.items() for _key, _item in tqdm(d_items): indicator_points = known_locations.copy() indicator_points[:, -1] = ( indicator_points[:, -1] <= float(_key) ).astype(int) predictions = [] # !!! TODO remove iteration over unknown locations for _point in unknown_locations: if kriging_type == 'ok': _pred_arr = ordinary_kriging( theoretical_model=_item, known_locations=indicator_points, unknown_locations=_point, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approximate_solutions, progress_bar=False ) elif kriging_type == 'sk': _pred_arr = simple_kriging( theoretical_model=_item, known_locations=indicator_points, unknown_locations=_point, process_mean=process_mean, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approximate_solutions, progress_bar=False ) else: raise ValueError('Kriging type not supported. ' 'Please choose from: ' '"ok" - ordinary kriging, ' '"sk" - simple kriging.') if len(_pred_arr) == 1: predictions.append(_pred_arr[0][1]) else: predictions.append(_pred_arr[0]) predictions_all.append(predictions) indicator_predictions = np.column_stack(predictions_all) indicator_predictions = self._clean_probabilities( indicator_predictions ) return indicator_predictions def _get_expected_values( self, ccdf_density=100 ) -> Tuple[np.ndarray, np.ndarray]: """ Function gets expected values and their variance for each point based on the ccdf function. Parameters ---------- ccdf_density : int, default = 100 The number of points used to interpolate expected value from the ccdf function. Returns ------- expected_values, expected_variances : Tuple The expected value and the expected variance for each point. """ expected_values = [] expected_variances = [] for row in self.indicator_predictions: old_indices = self.thresholds.copy() new_length = ccdf_density new_indices = np.linspace(old_indices.min(), old_indices.max(), new_length) if len(old_indices) >= 4: spl = UnivariateSpline(old_indices, row, s=0) else: spl = UnivariateSpline(old_indices, row, k=(len(old_indices) - 1), s=0) new_tvals = spl(new_indices) cdf = new_tvals.cumsum() / new_tvals.sum() ccdf = 1 - cdf expected_c = np.mean(ccdf) expected_c_index = np.searchsorted(np.flip(ccdf), expected_c) expected_c_index = len(ccdf) - expected_c_index expected_value = new_indices[expected_c_index] expected_values.append(expected_value) # Variance expected_variance = np.mean( np.abs(new_tvals - expected_value) ** 2 ) expected_variances.append(expected_variance) return np.array(expected_values), np.array(expected_variances) def _clean_probabilities(self, array: np.ndarray): """ Function normalizes and transform given probabilities. Parameters ---------- array : numpy array Array which rows are transformed. Returns ------- transformed : numpy array Transformed (normalized along rows) array. """ arr = self.__cut_to_zero_one(array).copy() return arr @staticmethod def __cut_to_zero_one(arr): arr[arr > 1] = 1 arr[arr < 0] = 0 return arr