Source code for pyinterpolate.kriging.point.ordinary

"""
Perform point ordinary kriging.

Authors
-------
1. Szymon Moliński | @SimonMolinsky
"""
# Python core
from typing import List, Union, Tuple

# Core calculation and data visualization
import numpy as np
from geopandas import GeoSeries
from geopandas.array import GeometryArray
from numpy.typing import ArrayLike

from shapely.geometry import Point
from tqdm import tqdm

from pyinterpolate.core.data_models.points import VariogramPoints, \
    InterpolationPoints
from pyinterpolate.kriging.utils.errors import singular_matrix_error
# Pyinterpolate
from pyinterpolate.kriging.utils.point_kriging_solve import (get_predictions,
                                                             solve_weights,
                                                             __experimental_solve_weights_lsa_only)
from pyinterpolate.transform.statistical import sem_to_cov
from pyinterpolate.semivariogram.theoretical.theoretical import TheoreticalVariogram


def ok_calc(
        theoretical_model: TheoreticalVariogram,
        unknown_location: ArrayLike,
        known_locations: ArrayLike = None,
        known_values: ArrayLike = None,
        known_geometries: ArrayLike = None,
        neighbors_range=None,
        no_neighbors=4,
        max_tick=5.,
        use_all_neighbors_in_range=False,
        allow_approximate_solutions=False
):
    """
    Function predicts value at unknown location with Ordinary Kriging
    technique.

    Parameters
    ----------
    theoretical_model : TheoreticalVariogram
        Fitted theoretical variogram model.

    unknown_location : Union[ArrayLike, Point]
        Points where you want to estimate value ``(x, y) <-> (lon, lat)``.

    known_locations : numpy array
        Known locations: ``[x, y, value]``.

    known_values : ArrayLike, optional
        Observation in the i-th geometry (from ``known_geometries``). Optional
        parameter, if not given then ``known_locations`` must be provided.

    known_geometries : ArrayLike, optional
        Array or similar structure with geometries. It must have the same
        length as ``known_values``. Optional parameter, if not given then
        ``known_locations`` must be provided. Point type geometry.

    neighbors_range : float, default=None
        The maximum distance where we search for neighbors. If ``None`` is
        given then the range is selected from the Theoretical
        Model's ``rang`` attribute.

    no_neighbors : int, default = 4
        The number of **n-closest neighbors** used for interpolation.

    max_tick : float, default=5.
        If searching for neighbors in a specific direction how big should be
        a tolerance for increasing the search angle (how many degrees more).

    use_all_neighbors_in_range : bool, default = False
        ``True``: if the real number of neighbors within
        the ``neighbors_range`` is greater than the ``number_of_neighbors``
        then take all of them anyway.

    allow_approximate_solutions : bool, default=False
        Allows the approximation of kriging weights based on the OLS
        algorithm. We don't recommend set it to ``True`` if you don't know
        what are you doing. This parameter can be useful when you have
        clusters in your dataset, that can lead to singular or near-singular
        matrix creation.

    Returns
    -------
    : numpy array
        ``[predicted value, variance error, longitude (x), latitude (y)]``

    Raises
    ------
    RunetimeError
        Singular Matrix in the Kriging system.
    """
    # Check if known locations are in the right format
    # Validate points
    if not isinstance(known_locations, VariogramPoints):
        known_locations = VariogramPoints(points=known_locations,
                                          geometries=known_geometries,
                                          values=known_values)
        known_locations = known_locations.points

    # Check if unknown location is Point
    if isinstance(unknown_location, Point):
        unknown_location = (
            unknown_location.x,
            unknown_location.y
        )
        unknown_location = np.array(unknown_location)

    k, predicted, dataset = get_predictions(theoretical_model,
                                            known_locations,
                                            unknown_location,
                                            neighbors_range,
                                            no_neighbors,
                                            use_all_neighbors_in_range,
                                            max_tick)

    k_ones = np.ones(1)[0]
    k = np.r_[k, k_ones]

    p_ones = np.ones((predicted.shape[0], 1))
    predicted_with_ones_col = np.c_[predicted, p_ones]
    p_ones_row = np.ones((1, predicted_with_ones_col.shape[1]))
    p_ones_row[0][-1] = 0.
    weights = np.r_[predicted_with_ones_col, p_ones_row]

    try:
        output_weights = solve_weights(weights,
                                       k,
                                       allow_approximate_solutions)
        zhat = dataset[:, -2].dot(output_weights[:-1])

        sigma = np.matmul(output_weights.T, k)

        if sigma < 0:
            return [zhat, np.nan, unknown_location[0], unknown_location[1]]

        return [zhat, sigma, unknown_location[0], unknown_location[1]]

    except np.linalg.LinAlgError as _:
        singular_matrix_error()


[docs] def ordinary_kriging( theoretical_model: TheoreticalVariogram, unknown_locations: Union[np.ndarray, Point, List, Tuple, GeoSeries, GeometryArray, ArrayLike], known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False, allow_approximate_solutions=False, progress_bar: bool = True ) -> np.ndarray: """ Function predicts value at unknown location with Ordinary Kriging technique. Parameters ---------- theoretical_model : TheoreticalVariogram Fitted theoretical variogram model. unknown_locations : Union[ArrayLike, Point] Points where you want to estimate value ``(x, y) <-> (lon, lat)``. known_locations : numpy array Known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then the range is selected from the Theoretical Model's ``rang`` attribute. no_neighbors : int, default = 4 The number of **n-closest neighbors** used for interpolation. max_tick : float, default=5. If searching for neighbors in a specific direction how big should be a tolerance for increasing the search angle (how many degrees more). use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` then take all of them anyway. allow_approximate_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. progress_bar : bool, default=True Show a progress bar during the interpolation process. Returns ------- : numpy array ``[predicted value, variance error, longitude (x), latitude (y)]`` Raises ------ RunetimeError Singular Matrix in the Kriging system. Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> import pandas as pd >>> >>> from pyinterpolate import (build_experimental_variogram, ... build_theoretical_variogram, ordinary_kriging) >>> >>> dem = gpd.read_file('dem.gpkg') >>> unknown_locations = gpd.read_file('unknown_locations.gpkg') >>> step_size = 500 >>> max_range = 10000 >>> exp_variogram = build_experimental_variogram( ... values=dem['dem'], ... geometries=dem['geometry'], ... step_size=step_size, ... max_range=max_range ... ) >>> theo_variogram = build_theoretical_variogram(exp_variogram) >>> interp = ordinary_kriging( ... theoretical_model=theo_variogram, ... unknown_locations=unknown_locations['geometry'], ... known_values=dem['dem'], ... known_geometries=dem['geometry'] ... ) >>> print(interp[0]) [7.91222896e+01 9.72740449e+01 2.38012302e+05 5.51466805e+05] """ # Check if known locations are in the right format if not isinstance(known_locations, VariogramPoints): known_locations = VariogramPoints(points=known_locations, geometries=known_geometries, values=known_values) known_locations = known_locations.points unknown_locations = InterpolationPoints(unknown_locations).points interpolated_results = [] _disable_progress_bar = not progress_bar for upoints in tqdm(unknown_locations, disable=_disable_progress_bar): res = ok_calc( theoretical_model=theoretical_model, known_locations=known_locations, unknown_location=upoints, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approximate_solutions ) interpolated_results.append( res ) return np.array(interpolated_results)
def ordinary_kriging_from_cov( theoretical_model: TheoreticalVariogram, unknown_location: Union[List, Tuple, np.ndarray], known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, sill=None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False, allow_approximate_solutions=False ) -> List: """ Function predicts value at unknown location using Ordinary Kriging. Parameters ---------- theoretical_model : TheoreticalVariogram Fitted theoretical variogram model. unknown_location : Union[List, Tuple, numpy array] Point where you want to estimate value ``(x, y) <-> (lon, lat)``. known_locations : numpy array Known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. sill : float Partial sill, or sill when nugget is set to zero. Total sill is a sum of partial sill and nugget. If given, then partial sill is fixed to this value. neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then range is selected from the Theoretical Model's ``rang`` attribute. no_neighbors : int, default = 4 The number of the **n-closest neighbors** used for interpolation. max_tick : float, default=5. If searching for neighbors in a specific direction how large should be tolerance for increasing the search angle (how many degrees more). use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` parameter then take all of them anyway. allow_approximate_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. Returns ------- : numpy array ``[predicted value, variance error, longitude (x), latitude (y)]`` Raises ------ RunetimeError Singularity matrix in a Kriging system. """ if not isinstance(known_locations, VariogramPoints): known_locations = VariogramPoints(points=known_locations, geometries=known_geometries, values=known_values) known_locations = known_locations.points k, predicted, dataset = get_predictions(theoretical_model, known_locations, unknown_location, neighbors_range, no_neighbors, use_all_neighbors_in_range, max_tick) if sill is None: sill = theoretical_model.sill k = sem_to_cov(k, sill) predicted = sem_to_cov(predicted, sill) k_ones = np.ones(1)[0] k = np.r_[k, k_ones] p_ones = np.ones((predicted.shape[0], 1)) predicted_with_ones_col = np.c_[predicted, p_ones] p_ones_row = np.ones((1, predicted_with_ones_col.shape[1])) p_ones_row[0][-1] = 0. weights = np.r_[predicted_with_ones_col, p_ones_row] try: output_weights = solve_weights(weights, k, allow_approximate_solutions) zhat = dataset[:, -2].dot(output_weights[:-1]) sigma = sill - np.matmul(output_weights.T, k) if sigma < 0: return [zhat, np.nan, unknown_location[0], unknown_location[1]] return [zhat, sigma, unknown_location[0], unknown_location[1]] except np.linalg.LinAlgError as _: singular_matrix_error() def experimental_ordinary_kriging_lsa( theoretical_model: TheoreticalVariogram, unknown_locations: Union[ np.ndarray, Point, List, Tuple, GeoSeries, GeometryArray, ArrayLike], known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False, progress_bar: bool = True ) -> np.ndarray: """ Function predicts value at unknown location using Least Squares Approximation. Parameters ---------- theoretical_model : TheoreticalVariogram Fitted theoretical variogram model. unknown_locations : Union[ArrayLike, Point] Points where you want to estimate value ``(x, y) <-> (lon, lat)``. known_locations : numpy array Known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then the range is selected from the Theoretical Model's ``rang`` attribute. no_neighbors : int, default = 4 The number of **n-closest neighbors** used for interpolation. max_tick : float, default=5. If searching for neighbors in a specific direction how big should be a tolerance for increasing the search angle (how many degrees more). use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` then take all of them anyway. progress_bar : bool, default=True Show a progress bar during the interpolation process. Returns ------- : numpy array ``[predicted value, variance error, longitude (x), latitude (y)]`` Raises ------ RunetimeError Singular Matrix in the Kriging system. Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> import pandas as pd >>> >>> from pyinterpolate import (build_experimental_variogram, ... build_theoretical_variogram, ordinary_kriging) >>> >>> dem = gpd.read_file('dem.gpkg') >>> unknown_locations = gpd.read_file('unknown_locations.gpkg') >>> step_size = 500 >>> max_range = 10000 >>> exp_variogram = build_experimental_variogram( ... values=dem['dem'], ... geometries=dem['geometry'], ... step_size=step_size, ... max_range=max_range ... ) >>> theo_variogram = build_theoretical_variogram(exp_variogram) >>> interp = ordinary_kriging( ... theoretical_model=theo_variogram, ... unknown_locations=unknown_locations['geometry'], ... known_values=dem['dem'], ... known_geometries=dem['geometry'] ... ) >>> print(interp[0]) [7.91222896e+01 9.72740449e+01 2.38012302e+05 5.51466805e+05] """ # Check if known locations are in the right format if not isinstance(known_locations, VariogramPoints): known_locations = VariogramPoints(points=known_locations, geometries=known_geometries, values=known_values) known_locations = known_locations.points unknown_locations = InterpolationPoints(unknown_locations).points interpolated_results = [] _disable_progress_bar = not progress_bar for upoints in tqdm(unknown_locations, disable=_disable_progress_bar): res = __experimental_ok_calc_lsa( theoretical_model=theoretical_model, known_locations=known_locations, unknown_location=upoints, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range ) interpolated_results.append( res ) return np.array(interpolated_results) def __experimental_ok_calc_lsa( theoretical_model: TheoreticalVariogram, unknown_location: ArrayLike, known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False ): """ Function predicts value at unknown location with Ordinary Kriging technique. Parameters ---------- theoretical_model : TheoreticalVariogram Fitted theoretical variogram model. unknown_location : Union[ArrayLike, Point] Points where you want to estimate value ``(x, y) <-> (lon, lat)``. known_locations : numpy array Known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then the range is selected from the Theoretical Model's ``rang`` attribute. no_neighbors : int, default = 4 The number of **n-closest neighbors** used for interpolation. max_tick : float, default=5. If searching for neighbors in a specific direction how big should be a tolerance for increasing the search angle (how many degrees more). use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` then take all of them anyway. Returns ------- : numpy array ``[predicted value, variance error, longitude (x), latitude (y)]`` Raises ------ RunetimeError Singular Matrix in the Kriging system. """ # Check if known locations are in the right format # Validate points if not isinstance(known_locations, VariogramPoints): known_locations = VariogramPoints(points=known_locations, geometries=known_geometries, values=known_values) known_locations = known_locations.points # Check if unknown location is Point if isinstance(unknown_location, Point): unknown_location = ( unknown_location.x, unknown_location.y ) unknown_location = np.array(unknown_location) k, predicted, dataset = get_predictions(theoretical_model, known_locations, unknown_location, neighbors_range, no_neighbors, use_all_neighbors_in_range, max_tick) k_ones = np.ones(1)[0] k = np.r_[k, k_ones] p_ones = np.ones((predicted.shape[0], 1)) predicted_with_ones_col = np.c_[predicted, p_ones] p_ones_row = np.ones((1, predicted_with_ones_col.shape[1])) p_ones_row[0][-1] = 0. weights = np.r_[predicted_with_ones_col, p_ones_row] output_weights = __experimental_solve_weights_lsa_only(weights, k) zhat = dataset[:, -2].dot(output_weights[:-1]) sigma = np.matmul(output_weights.T, k) if sigma < 0: return [zhat, np.nan, unknown_location[0], unknown_location[1]] return [zhat, sigma, unknown_location[0], unknown_location[1]]