Source code for pyinterpolate.kriging.block.area_to_point_poisson_kriging

"""
Area-to-point Poisson Kriging function.

Authors
-------
1. Szymon Moliński | @SimonMolinsky
"""
from typing import Union, Hashable

import numpy as np

from pyinterpolate.core.data_models.point_support import PointSupport
from pyinterpolate.kriging.block.weights import pk_weights_array
from pyinterpolate.semivariogram.deconvolution.block_to_block_semivariance import \
    weighted_avg_point_support_semivariances
from pyinterpolate.semivariogram.theoretical.classes.theoretical_variogram import \
    TheoreticalVariogram
from pyinterpolate.transform.select_poisson_kriging_data import \
    select_poisson_kriging_data
from pyinterpolate.transform.statistical import sem_to_cov
from pyinterpolate.transform.transform import add_ones


[docs] def area_to_point_pk(semivariogram_model: TheoreticalVariogram, point_support: PointSupport, unknown_block_index: Union[str, Hashable], number_of_neighbors: int, neighbors_range: float = None, raise_when_negative_prediction=True, raise_when_negative_error=True, err_to_nan=True, negative_prediction_to_zero=False): """ Function predicts point-support value in the unknown location based on the area-to-point Poisson Kriging Parameters ---------- semivariogram_model : TheoreticalVariogram A fitted variogram. point_support : PointSupport The point support of polygons. unknown_block_index : Hashable The id of the block with the unknown value. number_of_neighbors : int The minimum number of neighbours that can potentially affect the unknown block. neighbors_range : float, optional The maximum range for neighbors search. If not provided then it is read from the semivariogram model. raise_when_negative_prediction : bool, default=True Raise error when prediction is negative. raise_when_negative_error : bool, default=True Raise error when prediction error is negative. err_to_nan : bool, default=True When point interpolation returns ``ValueError`` then set prediction or variance error to ``NaN``. negative_prediction_to_zero : bool, default=False While prediction is negative, then set it to 0. Returns ------- results : Dict[numpy array] ```{"block_id": [["x", "y", "zhat", "sig"], ...]}`` Raises ------ ValueError Prediction or prediction error are negative. Examples -------- >>> import os >>> import geopandas as gpd >>> from pyinterpolate import ( ... area_to_point_pk, ... Blocks, ... ExperimentalVariogram, ... PointSupport, ... TheoreticalVariogram ... ) >>> >>> >>> FILENAME = 'cancer_data.gpkg' >>> LAYER_NAME = 'areas' >>> DS = gpd.read_file(FILENAME, layer=LAYER_NAME) >>> AREA_VALUES = 'rate' >>> AREA_INDEX = 'FIPS' >>> AREA_GEOMETRY = 'geometry' >>> PS_LAYER_NAME = 'points' >>> PS_VALUES = 'POP10' >>> PS_GEOMETRY = 'geometry' >>> PS = gpd.read_file(FILENAME, layer=PS_LAYER_NAME) >>> >>> CANCER_DATA = { ... 'ds': DS, ... 'index_column_name': AREA_INDEX, ... 'value_column_name': AREA_VALUES, ... 'geometry_column_name': AREA_GEOMETRY ... } >>> POINT_SUPPORT_DATA = { ... 'ps': PS, ... 'value_column_name': PS_VALUES, ... 'geometry_column_name': PS_GEOMETRY ... } >>> BLOCKS = Blocks(**CANCER_DATA) >>> indexes = BLOCKS.block_indexes >>> >>> PS = PointSupport( ... points=POINT_SUPPORT_DATA['ps'], ... ps_blocks=BLOCKS, ... points_value_column=POINT_SUPPORT_DATA['value_column_name'], ... points_geometry_column=POINT_SUPPORT_DATA['geometry_column_name'] ... ) >>> >>> EXPERIMENTAL = ExperimentalVariogram( ... ds=BLOCKS.representative_points_array(), ... step_size=40000, ... max_range=300001 ... ) >>> >>> THEO = TheoreticalVariogram() >>> THEO.autofit( ... experimental_variogram=EXPERIMENTAL, ... sill=150 ... ) >>> atp_pk = area_to_point_pk( ... semivariogram_model=THEO, ... point_support=PS, ... unknown_block_index=34017, ... number_of_neighbors=8 ... ) >>> print(atp_pk) {34017: array([[1.82287362e+06, 4.06124500e+05, 1.34010668e+02, 5.47866897e+00]])} """ # Prepare Kriging Data # { # known block id: [ # (unknown x, unknown y), # [unknown val, known val, distance between points] # ] # } kriging_data = select_poisson_kriging_data( block_index=unknown_block_index, point_support=point_support, semivariogram_model=semivariogram_model, number_of_neighbors=number_of_neighbors, neighbors_range=neighbors_range ) closest_neighbors = kriging_data.neighbors_unique_indexes poss_neighbors = point_support.no_possible_neighbors if poss_neighbors > 0: if poss_neighbors < len(closest_neighbors): # todo? Move this step into select_poisson_kriging_data closest_neighbors = closest_neighbors[:poss_neighbors] kriging_data._neighbors_unique_indexes = closest_neighbors n = len(closest_neighbors) block_points = point_support.get_points_array( block_id=unknown_block_index ) tot_unknown_value = np.sum(block_points[:, 2]) sill = semivariogram_model.sill # Get block to point weighted semivariances b2p_semivariances = kriging_data.point_to_block_ps_semivariance_array() # Transform to covariances avg_b2p_covariances = sem_to_cov(b2p_semivariances, sill) # Add ones avg_b2p_covariances = add_ones( avg_b2p_covariances ) # Distances between neighboring areas # [ # (known block id a, known block id b), # pt a val, pt b val, distance between points # ] distances_between_neighboring_point_supports = kriging_data.distances_between_neighboring_point_supports( point_support=point_support ) # Semivariances between neighbors # (known block id a, known block id b) -> semivariance (Series) semivariances_between_neighboring_point_supports = weighted_avg_point_support_semivariances( semivariogram_model, distances_between_neighboring_point_supports, index_col='blocks_pair', val1_col='block_a_value', val2_col='block_b_value', dist_col='distance' ) semivariances_nn = semivariances_between_neighboring_point_supports.to_numpy() predicted = sem_to_cov(semivariances_nn, sill) predicted = predicted.reshape((n, n)) # Add diagonal weights block_values = point_support.blocks.get_blocks_values( indexes=closest_neighbors ) totals = point_support.point_support_totals(blocks=closest_neighbors) p_weights = pk_weights_array(predicted.shape, block_values, totals) weighted_and_predicted = predicted + p_weights # Prepare weights matrix p_ones = np.ones((predicted.shape[0], 1)) predicted_with_ones_col = np.c_[weighted_and_predicted, p_ones] p_ones_row = np.ones((1, predicted_with_ones_col.shape[1])) p_ones_row[0][-1] = 0. weights = np.r_[predicted_with_ones_col, p_ones_row] # Solve Kriging systems # Each point of unknown area represents different kriging system # Transpose points matrix transposed = (np.nan_to_num(avg_b2p_covariances)).T # Find covariance within unknown block's point support distances_within_unknown_block_point_support = kriging_data.distances_within_unknown_block( point_support=point_support ) semivariance_within_unknown_block = weighted_avg_point_support_semivariances( semivariogram_model=semivariogram_model, distances_between_neighboring_point_supports=distances_within_unknown_block_point_support, index_col='block_id', val1_col='point_a_value', val2_col='point_b_value', dist_col='distance' ) covariance_within_unknown = sem_to_cov( semivariance_within_unknown_block.to_numpy(), sill ) predicted_points = [] # todo: for loop into a function for idx, upoint in enumerate(block_points): vals = transposed[idx] try: w = np.linalg.solve(weights, vals) except TypeError: weights = weights.astype(float) vals = vals.astype(float) w = np.linalg.solve(weights, vals) zhat = block_values.dot(w[:-1]) if zhat < 0: if raise_when_negative_prediction: raise ValueError(f'Predicted value is {zhat} and it ' f'should not be lower than 0. Check your ' f'sampling grid, samples, number of ' f'neighbors or semivariogram model type.') if err_to_nan: predicted_points.append( [upoint[0], upoint[1], np.nan, np.nan] ) continue if negative_prediction_to_zero: predicted_points.append( [upoint[0], upoint[1], 0, np.nan] ) continue point_pop = upoint[2] zhat = (zhat * point_pop) / tot_unknown_value # Calculate error _sigmasq = np.matmul(w.T, vals) sigmasq = covariance_within_unknown[0] - _sigmasq if sigmasq < 0: if raise_when_negative_error: raise ValueError(f'Predicted error value is {sigmasq} and ' f'it should not be lower than 0. ' f'Check your sampling grid, samples, ' f'number of neighbors or semivariogram ' f'model type.') if err_to_nan: predicted_points.append( [upoint[0], upoint[1], zhat, np.nan] ) continue else: sigma = 0 else: sigma = np.sqrt(sigmasq) predicted_points.append([upoint[0], upoint[1], zhat, sigma]) parr = np.asarray(predicted_points) return {unknown_block_index: parr}