"""
Centroid-based Poisson Kriging function.
Authors
-------
1. Szymon Moliński | @SimonMolinsky
"""
from typing import Union, Hashable, Dict
import numpy as np
from pyinterpolate.core.data_models.point_support import PointSupport
from pyinterpolate.kriging.block.weights import pk_weights_array
from pyinterpolate.kriging.utils.point_kriging_solve import solve_weights
from pyinterpolate.semivariogram.theoretical.classes.theoretical_variogram import TheoreticalVariogram
from pyinterpolate.transform.select_poisson_kriging_data import select_centroid_poisson_kriging_data
from pyinterpolate.transform.statistical import sem_to_cov
[docs]
def centroid_poisson_kriging(semivariogram_model: TheoreticalVariogram,
point_support: PointSupport,
unknown_block_index: Union[str, Hashable],
number_of_neighbors: int,
neighbors_range: float = None,
is_weighted_by_point_support=True,
raise_when_negative_prediction=True,
raise_when_negative_error=True,
allow_lsa=False,
negative_prediction_to_zero=False) -> Dict:
"""
Function performs centroid-based Poisson Kriging of blocks (areal) data.
Parameters
----------
semivariogram_model : TheoreticalVariogram
A fitted variogram.
point_support : PointSupport
The point support of polygons.
unknown_block_index : Hashable
The id of the block with the unknown value.
number_of_neighbors : int
The minimum number of neighbours that can potentially affect
the unknown block.
neighbors_range : float, optional
The maximum range for neighbors search. If not provided then it is
read from the semivariogram model.
is_weighted_by_point_support : bool, default = True
Are distances between blocks weighted by the point support?
raise_when_negative_prediction : bool, default=True
Raise error when prediction is negative.
raise_when_negative_error : bool, default=True
Raise error when prediction error is negative.
allow_lsa : bool, default=False
Allows the approximation of kriging weights based on the OLS
algorithm. We don't recommend set it to ``True``
if you don't know what are you doing. This parameter can be useful
when you have clusters in your dataset,
that can lead to singular or near-singular matrix creation.
negative_prediction_to_zero : bool, default=False
While prediction is negative, then set it to 0.
Returns
-------
results : Dict
Block index, prediction, error: ``{"block id", "zhat", "sig"}``
Raises
------
ValueError
Raised when prediction or prediction error are negative.
Examples
--------
>>> import os
>>> import geopandas as gpd
>>> from pyinterpolate import (
... centroid_poisson_kriging,
... Blocks,
... ExperimentalVariogram,
... PointSupport,
... TheoreticalVariogram
... )
>>>
>>>
>>> FILENAME = 'cancer_data.gpkg'
>>> LAYER_NAME = 'areas'
>>> DS = gpd.read_file(FILENAME, layer=LAYER_NAME)
>>> AREA_VALUES = 'rate'
>>> AREA_INDEX = 'FIPS'
>>> AREA_GEOMETRY = 'geometry'
>>> PS_LAYER_NAME = 'points'
>>> PS_VALUES = 'POP10'
>>> PS_GEOMETRY = 'geometry'
>>> PS = gpd.read_file(FILENAME, layer=PS_LAYER_NAME)
>>>
>>> CANCER_DATA = {
... 'ds': DS,
... 'index_column_name': AREA_INDEX,
... 'value_column_name': AREA_VALUES,
... 'geometry_column_name': AREA_GEOMETRY
... }
>>> POINT_SUPPORT_DATA = {
... 'ps': PS,
... 'value_column_name': PS_VALUES,
... 'geometry_column_name': PS_GEOMETRY
... }
>>> BLOCKS = Blocks(**CANCER_DATA)
>>> indexes = BLOCKS.block_indexes
>>>
>>> PS = PointSupport(
... points=POINT_SUPPORT_DATA['ps'],
... ps_blocks=BLOCKS,
... points_value_column=POINT_SUPPORT_DATA['value_column_name'],
... points_geometry_column=POINT_SUPPORT_DATA['geometry_column_name']
... )
>>>
>>> EXPERIMENTAL = ExperimentalVariogram(
... ds=BLOCKS.representative_points_array(),
... step_size=40000,
... max_range=300001
... )
>>>
>>> THEO = TheoreticalVariogram()
>>> THEO.autofit(
... experimental_variogram=EXPERIMENTAL,
... sill=150
... )
>>> centroid_pk = centroid_poisson_kriging(
... semivariogram_model=THEO,
... point_support=PS,
... unknown_block_index=34017,
... number_of_neighbors=8
... )
>>> print(centroid_pk)
{'block_id': 34017, 'zhat': 134.13113195517153, 'sig': 11.795520367595483}
"""
# Kriging data
kriging_data = select_centroid_poisson_kriging_data(
block_index=unknown_block_index,
point_support=point_support,
semivariogram_model=semivariogram_model,
number_of_neighbors=number_of_neighbors,
weighted=is_weighted_by_point_support,
neighbors_range=neighbors_range
)
sill = semivariogram_model.sill
n = len(kriging_data)
distances = kriging_data.distances
values = kriging_data.values
kindexes = kriging_data.neighbors_indexes
partial_semivars = semivariogram_model.predict(distances)
pcovars = sem_to_cov(partial_semivars, sill)
covars = np.ones(len(pcovars) + 1)
covars[:-1] = pcovars
covars = covars.transpose()
# Distances between known blocks
block_distances = point_support.get_distances_between_known_blocks(
block_ids=kindexes
)
known_blocks_semivars = semivariogram_model.predict(
block_distances.flatten()
)
predicted = np.array(known_blocks_semivars.reshape(n, n))
predicted = sem_to_cov(predicted, sill)
# Add diagonal weights to predicted semivars array
totals = point_support.point_support_totals(
kindexes
)
weights = pk_weights_array(predicted.shape,
values,
totals)
weighted_and_predicted = predicted + weights
# Prepare matrix for solving kriging system
ones_col = np.ones((weighted_and_predicted.shape[0], 1))
weighted_and_predicted = np.c_[weighted_and_predicted, ones_col]
ones_row = np.ones((1, weighted_and_predicted.shape[1]))
ones_row[0][-1] = 0
kriging_weights = np.r_[weighted_and_predicted, ones_row]
# Solve Kriging system
try:
output_weights = solve_weights(weights=kriging_weights,
k=covars,
allow_lsa=allow_lsa)
except np.linalg.LinAlgError as _:
msg = ('Singular matrix in Kriging system detected, check if you '
'have duplicated coordinates in the input dataset.')
raise RuntimeError(msg)
zhat = values.dot(output_weights[:-1])
if zhat < 0:
if raise_when_negative_prediction:
raise ValueError(f'Predicted value is {zhat} and it should not '
f'be lower than 0. Check your sampling '
f'grid, samples, number of neighbors or '
f'semivariogram model type.')
if negative_prediction_to_zero:
zhat = 0
sigmasq = np.matmul(output_weights.T, covars)
if sigmasq < 0:
if raise_when_negative_error:
raise ValueError(f'Predicted error value is {sigmasq} and it '
f'should not be lower than 0. '
f'Check your sampling grid, samples, number of '
f'neighbors or semivariogram model type.'
f'Error is related to the block with index '
f'{unknown_block_index}')
sigma = np.nan
else:
sigma = np.sqrt(sigmasq)
# Prepare output
results = {
'block_id': unknown_block_index,
'zhat': zhat,
'sig': sigma
}
return results