Source code for pyinterpolate.core.pipelines.interpolate

from numpy.typing import ArrayLike
import os

import dask
import numpy as np
from tqdm import tqdm
from dask.diagnostics import ProgressBar

from pyinterpolate.kriging.point.ordinary import ok_calc
from pyinterpolate.semivariogram.theoretical.classes.theoretical_variogram import TheoreticalVariogram
from pyinterpolate.transform.geo import geometry_and_values_array


[docs] def interpolate_points( theoretical_model: TheoreticalVariogram, unknown_locations: ArrayLike, known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False, allow_approximate_solutions=False, progress_bar=True ) -> np.ndarray: """ Function predicts values at unknown locations with Ordinary Kriging. Parameters ---------- theoretical_model : TheoreticalVariogram Fitted theoretical variogram model. unknown_locations : numpy array Points where you want to estimate value ``[(x, y), ...] <-> [(lon, lat), ...]``. known_locations : numpy array, optional The known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. neighbors_range : float, default=None The maximum distance where we search for the neighbors. If ``None`` is given then range is selected from the theoretical model's ``rang`` attribute. no_neighbors : int, default = 4 The number of the **n-closest neighbors** used for interpolation. max_tick : float, default=5. Maximum number of degrees for neighbors search angle. use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` parameter then take all of them anyway. allow_approximate_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. But the better idea is to get rid of those clusters. progress_bar : bool, default = True Shows progress bar Returns ------- : numpy array ``[predicted value, variance error, longitude (x), latitude (y)]`` Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> import pandas as pd >>> >>> from pyinterpolate import (build_experimental_variogram, ... build_theoretical_variogram) >>> from pyinterpolate.core.pipelines.interpolate import interpolate_points >>> >>> dem = gpd.read_file('dem.gpkg') >>> unknown_locations = gpd.read_file('unknown_locations.gpkg') >>> step_size = 500 >>> max_range = 10000 >>> exp_variogram = build_experimental_variogram( ... values=dem['dem'], ... geometries=dem['geometry'], ... step_size=step_size, ... max_range=max_range ... ) >>> theo_variogram = build_theoretical_variogram(exp_variogram) >>> interp = interpolate_points( ... theoretical_model=theo_variogram, ... unknown_locations=unknown_locations['geometry'], ... known_values=dem['dem'], ... known_geometries=dem['geometry'] ... ) >>> print(interp[0]) [7.91222896e+01 9.72740449e+01 2.38012302e+05 5.51466805e+05] """ if known_locations is None: known_locations = geometry_and_values_array( geometry=known_geometries, values=known_values ) interpolated_results = [] _disable_progress_bar = not progress_bar for upoints in tqdm(unknown_locations, disable=_disable_progress_bar): res = ok_calc( theoretical_model=theoretical_model, known_locations=known_locations, unknown_location=upoints, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approximate_solutions ) interpolated_results.append( res ) return np.array(interpolated_results)
[docs] def interpolate_points_dask( theoretical_model: TheoreticalVariogram, unknown_locations: ArrayLike, known_locations: ArrayLike = None, known_values: ArrayLike = None, known_geometries: ArrayLike = None, neighbors_range=None, no_neighbors=4, max_tick=5., use_all_neighbors_in_range=False, allow_approximate_solutions=False, number_of_workers=1, progress_bar=True ) -> np.ndarray: """ Function predicts values at unknown locations with Ordinary Kriging using Dask backend, makes sense when you must interpolate large number of points. Parameters ---------- theoretical_model : TheoreticalVariogram Fitted theoretical variogram model. unknown_locations : numpy array Points where you want to estimate value ``[(x, y), ...] <-> [(lon, lat), ...]``. known_locations : numpy array, optional The known locations: ``[x, y, value]``. known_values : ArrayLike, optional Observation in the i-th geometry (from ``known_geometries``). Optional parameter, if not given then ``known_locations`` must be provided. known_geometries : ArrayLike, optional Array or similar structure with geometries. It must have the same length as ``known_values``. Optional parameter, if not given then ``known_locations`` must be provided. Point type geometry. neighbors_range : float, default=None The maximum distance where we search for the neighbors. If ``None`` is given then range is selected from the theoretical model's ``rang`` attribute. no_neighbors : int, default = 4 The number of the **n-closest neighbors** used for interpolation. max_tick : float, default=5. Maximum number of degrees for neighbors search angle. use_all_neighbors_in_range : bool, default = False ``True``: if the real number of neighbors within the ``neighbors_range`` is greater than the ``number_of_neighbors`` parameter then take all of them anyway. allow_approximate_solutions : bool, default=False Allows the approximation of kriging weights based on the OLS algorithm. We don't recommend set it to ``True`` if you don't know what are you doing. This parameter can be useful when you have clusters in your dataset, that can lead to singular or near-singular matrix creation. But the better idea is to get rid of those clusters. number_of_workers : int, default = 1 How many processing units can be used for predictions. Increase it only for a very large number of interpolated points (~10k+). progress_bar : bool, default = True Shows progress bar Returns ------- : numpy array ``[predicted value, variance error, longitude (x), latitude (y)]`` Examples -------- >>> import geopandas as gpd >>> import numpy as np >>> import pandas as pd >>> >>> from pyinterpolate import (build_experimental_variogram, ... build_theoretical_variogram) >>> from pyinterpolate.core.pipelines.interpolate import interpolate_points_dask >>> >>> dem = gpd.read_file('dem.gpkg') >>> unknown_locations = gpd.read_file('unknown_locations.gpkg') >>> step_size = 500 >>> max_range = 10000 >>> exp_variogram = build_experimental_variogram( ... values=dem['dem'], ... geometries=dem['geometry'], ... step_size=step_size, ... max_range=max_range ... ) >>> theo_variogram = build_theoretical_variogram(exp_variogram) >>> interp = interpolate_points_dask( ... theoretical_model=theo_variogram, ... unknown_locations=unknown_locations['geometry'], ... known_values=dem['dem'], ... known_geometries=dem['geometry'] ... ) >>> print(interp[0]) [7.91222896e+01 9.72740449e+01 2.38012302e+05 5.51466805e+05] """ if known_locations is None: known_locations = geometry_and_values_array( geometry=known_geometries, values=known_values ) if number_of_workers == -1: core_num = os.cpu_count() if core_num > 1: number_of_workers = core_num - 1 # Safety reasons else: number_of_workers = core_num if number_of_workers == 1: results = interpolate_points( theoretical_model=theoretical_model, known_locations=known_locations, unknown_locations=unknown_locations, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approximate_solutions, progress_bar=progress_bar ) return results else: pbar = ProgressBar() pbar.register() results = [] for upoints in unknown_locations: prediction = dask.delayed(ok_calc)( theoretical_model=theoretical_model, known_locations=known_locations, unknown_location=upoints, neighbors_range=neighbors_range, no_neighbors=no_neighbors, max_tick=max_tick, use_all_neighbors_in_range=use_all_neighbors_in_range, allow_approximate_solutions=allow_approximate_solutions ) results.append(prediction) predictions = dask.delayed()(results) predictions = predictions.compute(num_workers=number_of_workers) return np.array(predictions)