Source code for pyinterpolate.kriging.point.universal

from typing import Union, Any

import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import ArrayLike

from pyinterpolate.core.data_models.points import VariogramPoints
from pyinterpolate.core.pipelines.interpolate import interpolate_points, \
    interpolate_points_dask

from pyinterpolate.semivariogram.experimental.classes.experimental_variogram import ExperimentalVariogram
from pyinterpolate.semivariogram.theoretical.classes.theoretical_variogram import TheoreticalVariogram


class MultivariateRegression:
    """
    Class performs multivariate regression.

    Attributes
    ----------
    features : numpy array
        Features matrix.

    y : numpy array
        Predicted value.

    coefficients : numpy array
        Array of coefficients.

    intercept : float
        Constant bias.

    Methods
    -------
    fit()
        Fits observations into linear regression model.

    predict()
        Predicts values based on the coefficients from training (``fit()``)
        step.
    """

    def __init__(self):
        self.features = None
        self.y = None
        self.coefficients = None
        self.intercept = None

    def fit(self, dataset: np.ndarray):
        """
        Method fits dataset into the linear regression model.

        Parameters
        ----------
        dataset : numpy array
            Numpy array with more than 2 columns. The last column represents
            response - observations.
        """
        self.features = dataset[:, :-1].copy()
        self.y = dataset[:, -1].copy()
        _features_ones = np.c_[
            self.features, np.ones(self.features.shape[0])
        ]
        params = self._get_coefficients(_features_ones)
        self.coefficients = params[:-1]
        self.intercept = params[-1]

    def predict(self, features: np.ndarray):
        """
        Predicts response value based on given variables.

        Parameters
        ----------
        features : numpy array
            The variables used for prediction.

        Returns
        -------
        : numpy array
            Predicted values.
        """
        # TODO: check if features has the same number of columns
        #  as self.features
        return np.matmul(self.coefficients, features.T) + self.intercept

    def _get_coefficients(self, features_with_ones):
        return np.linalg.lstsq(features_with_ones, self.y, rcond=None)[0]


[docs] class UniversalKriging: """ Performs universal kriging. Parameters ---------- known_points : ArrayLike, optional Known points and their values ``[lon, lat, value]`` known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. fitted_regression_model : optional Any kind of regression model with `.predict()` method that could be used for trend modeling. Attributes ---------- known_points : numpy array See ``observations`` parameter. trend_model : MultivariateRegression or another regression model The model responsible for trend prediction. It could be a model from the external package, but it must have `.predict()` method which takes as an input modeling features (coordinates). trend_values : numpy array Observation values predicted by regression model. bias_values : numpy array The difference between measured observations and ``trend_values``. bias_experimental_model : ExperimentalVariogram The experimental semivariogram of bias. bias_model : TheoreticalVariogram Modeled semivariogram of the data bias. Methods ------- fit_trend() Fits observations into linear regression model. detrend() Removes trend from observations (calculates bias). fit_bias() Fits bias into semivariogram model. predict() Predicts values at unknown locations. plot_experimental_bias_model() Plots experimental semivariogram of bias. plot_theoretical_bias_model() Plots theoretical semivariogram of bias. plot_trend_surfaces() Visual comparison of observations, trend, and bias. Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> import pandas as pd >>> >>> from pyinterpolate.kriging.point.universal import UniversalKriging >>> >>> >>> dem = gpd.read_file('dem.gpkg') >>> unknown_points = gpd.read_file('unknown_locations.gpkg') >>> uk = UniversalKriging( ... known_values=dem[:, -1], ... known_geometries=dem[:, :-1] ... ) >>> uk.fit_trend() >>> uk.detrend() >>> uk.fit_bias( ... step_size=500, max_range=10000 ... ) >>> predictions = uk.predict(points=unknown_points) >>> print(predictions[0]) # z_hat, x, y [9.72916006e+01 2.38012302e+05 5.51466805e+05] """ def __init__(self, known_points: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, fitted_regression_model: Any = None): # Core known_points = VariogramPoints(points=known_points, geometries=known_geometries, values=known_values) known_points = known_points.points self.known_points = known_points # Trend self.trend_model = None self.trend_values = None if fitted_regression_model is not None: self.trend_model = fitted_regression_model self.trend_values = self.trend_model.predict( self.known_points[:, :-1] ) # Bias self.bias_values = None self.bias_experimental_model: ExperimentalVariogram = None self.bias_model: TheoreticalVariogram = None
[docs] def fit_trend(self): """ Function fits data into MultivariateRegression model. """ trend_model = MultivariateRegression() trend_model.fit(self.known_points) self.trend_model = trend_model self.trend_values = self.trend_model.predict( features=self.known_points[:, :-1] )
[docs] def detrend(self, trend_values: ArrayLike = None): """ Function removes trend from observations. Parameters ---------- trend_values : optional, numpy array Optional array with trend values. It may be an output from the external regression model. """ if trend_values is not None: self.bias_values = self.known_points[:, -1] - trend_values else: self.bias_values = self.known_points[:, -1] - self.trend_values
[docs] def fit_bias(self, step_size: float, max_range: float, use_all_models=False): """ Function fits bias into variogram models. Parameters ---------- step_size : float Bins width. max_range : float Maximum range of analysis. use_all_models : bool, default = False Use all available semivariogram models (``True``), or only the selection of models - linear, spherical, and power model. """ # Get bias if self.bias_values is None: self.detrend() # Create experimental semivariogram bias_arr = np.c_[ self.known_points[:, :-1], self.bias_values ] self.bias_experimental_model = ExperimentalVariogram( ds=bias_arr, step_size=step_size, max_range=max_range, is_semivariance=True, is_covariance=False ) # Fit experimental into theoretical if use_all_models: used_models = 'all' else: used_models = 'safe' self.bias_model = TheoreticalVariogram() self.bias_model.autofit( experimental_variogram=self.bias_experimental_model, models_group=used_models )
[docs] def predict(self, points: ArrayLike, neighbors_range: Union[float, None] = None, no_neighbors: int = 4, use_all_neighbors_in_range=False, allow_approx_solutions=False, number_of_workers: int = 1, show_progress_bar: bool = True): """ Parameters ---------- points : numpy array Coordinates with missing values (to estimate results). neighbors_range : float, default=None The maximum distance where we search for neighbors. If ``None`` is given then range is selected from the ``theoretical_model`` ``rang`` attribute. no_neighbors : int, default = 4 The number of the **n-closest neighbors** used for interpolation. use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` parameter then take all of them anyway. allow_approx_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. number_of_workers : int, default=1 How many processing units can be used for predictions. Increase it only for a very large number of interpolated points (~10k+). show_progress_bar : bool, default=True Show progress bar of predictions. Returns ------- : numpy array Predictions ``[predicted value, longitude (x), latitude (y)]`` """ # Predict trend surface from regression model trends = self.trend_model.predict( points ) # Predict bias if number_of_workers <= 1: biases = interpolate_points( theoretical_model=self.bias_model, known_locations=np.c_[ self.known_points[:, :-1], self.bias_values], unknown_locations=points, neighbors_range=neighbors_range, no_neighbors=no_neighbors, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approx_solutions, progress_bar=show_progress_bar ) else: biases = interpolate_points_dask( theoretical_model=self.bias_model, known_locations=np.c_[ self.known_points[:, :-1], self.bias_values], unknown_locations=points, neighbors_range=neighbors_range, no_neighbors=no_neighbors, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approx_solutions, progress_bar=show_progress_bar, number_of_workers=number_of_workers ) biases = biases[:, 0] # parse predictions predicted_values = trends + biases predicted = np.c_[ predicted_values, points[:, 0], points[:, 1] ] return predicted
[docs] def plot_experimental_bias_model(self): """ Plots experimental variogram of bias. """ self.bias_experimental_model.plot( semivariance=True, covariance=False, variance=False )
[docs] def plot_theoretical_bias_model(self): """ Plots theoretical semivariogram of bias. """ self.bias_model.plot()
[docs] def plot_trend_surfaces(self): """ Plots initial observations, trend surface, and bias surface. """ fig, ax = plt.subplots(3) # Base surface base_x = self.known_points[:, 0] base_y = self.known_points[:, 1] base_z = self.known_points[:, 2] base_min = np.min(base_z) base_max = np.max(base_z) base_std = np.std(base_z) * 2 base_plot = ax[0].scatter( x=base_x, y=base_y, c=base_z, cmap='gist_earth', vmin=base_min, vmax=base_max ) fig.colorbar(mappable=base_plot, ax=ax[0]) # trend surface trend_plot = ax[1].scatter( x=base_x, y=base_y, c=self.trend_values, cmap='gist_earth', vmin=base_min, vmax=base_max ) fig.colorbar(mappable=trend_plot, ax=ax[1]) # error bias_plot = ax[2].scatter( x=base_x, y=base_y, c=base_z - self.trend_values, cmap='cool', vmin=-base_std, vmax=base_std ) fig.colorbar(mappable=bias_plot, ax=ax[2]) plt.show()