Source code for pyinterpolate.semivariogram.deconvolution.aggregated_variogram

"""
Class to work with blocks & point support datasets and transformations.

Authors
-------
1. Szymon Moliński | @SimonMolinsky

"""
from copy import deepcopy
from typing import Union

import numpy as np
from matplotlib import pyplot as plt

from pyinterpolate.semivariogram.experimental.classes.experimental_variogram import ExperimentalVariogram
from pyinterpolate.core.data_models.blocks import Blocks
from pyinterpolate.core.data_models.point_support import PointSupport
from pyinterpolate.core.data_models.point_support_distances import \
    PointSupportDistance
from pyinterpolate.semivariogram.deconvolution.avg_inblock_semivariance import \
    calculate_average_semivariance
from pyinterpolate.semivariogram.deconvolution.block_to_block_semivariance import \
    calculate_block_to_block_semivariance, \
    average_block_to_block_semivariances
from pyinterpolate.semivariogram.deconvolution.inblock import \
    calculate_inblock_semivariance
from pyinterpolate.semivariogram.theoretical.classes.theoretical_variogram import \
    TheoreticalVariogram


[docs] class AggregatedVariogram: """ Class calculates semivariances between blocks of aggregated data. Parameters ---------- blocks : Blocks Aggregated data | choropleth map. point_support : PointSupport Point supports of ``blocks``. step_size : float Step size between lags. max_range : float Maximal distance of analysis. nugget : float, default = 0 The nugget of blocks data. direction : float (in range [0, 360]), optional Direction of ``blocks`` semivariogram, values from 0 to 360 degrees: - 0 or 180: is E-W, - 90 or 270 is N-S, - 45 or 225 is NE-SW, - 135 or 315 is NW-SE. tolerance : float (in range [0, 1]), optional If ``tolerance`` is 0 then points must be placed at a single line with the beginning in the origin of the coordinate system and the direction given by y axis and direction parameter. If ``tolerance`` is ``> 0`` then the bin is selected as an elliptical area with major axis pointed in the same direction as the line for ``0`` tolerance. variogram_weighting_method : str, default = "closest" Method used to weight error at a given lags. Available methods: - **equal**: no weighting, - **closest**: lags at a close range have bigger weights, - **distant**: lags that are further away have bigger custom_weights, - **dense**: error is weighted by the number of point pairs within a lag - more pairs, lower weight. models_group : str or list, default='safe' Models group to test: - 'all' - the same as list with all models, - 'safe' - ``['linear', 'power', 'spherical']`` - as a list: multiple model types to test - as a single model type from the list: - 'circular', - 'cubic', - 'exponential', - 'gaussian', - 'linear', - 'power', - 'spherical'. verbose : bool, default = False Print steps performed by the algorithm. log_process : bool, default = False Log process info (Level ``DEBUG``). Attributes ---------- blocks : Blocks See ``aggregared_data`` parameter. step_size : float See ``step_size`` parameter. max_range : float See ``max_range`` parameter. agg_lags : numpy array Lags calculated as a set of equidistant points from ``step_size`` to ``max_range`` with a step of size ``step_size``. tolerance : float, default = 1 See ``tolerance`` parameter. direction : float, default = 0 See ``direction`` parameter. nugget : float, default = 0 See ``nugget`` parameter. models_group : str or list See ``models_group`` parameter. point_support : PointSupport See the ``point_support`` parameter. weighting_method : str, default = 'closest' See the ``variogram_weighting_method`` parameter. verbose : bool, default = False See ``verbose`` parameter. log_process : bool, default = False See the ``log_process`` parameter. experimental_variogram : ExperimentalVariogram The experimental variogram calculated from blocks (their coordinates). theoretical_model : TheoreticalVariogram The theoretical model derived from blocks' coordinates. inblock_semivariance : Dict ``{area id: the average inblock semivariance}`` avg_inblock_semivariance : numpy array ``[lag, average inblocks semivariances, number of blocks within a lag]`` block_to_block_semivariance : Dict ``{(block i, block j): [distance, semivariance, number of point pairs between blocks]}`` avg_block_to_block_semivariance : numpy array ``[lag, semivariance, number of point pairs between blocks]`` regularized_variogram : numpy array ``[lag, semivariance]`` distances_between_blocks : Dict Weighted distances between all blocks: ``{block id : [distances to other blocks]}`` Methods ------- regularize() Method performs semivariogram regularization. show_semivariograms() Shows experimental variogram, theoretical model, average inblock semivariance, average block to block semivariance and regularized variogram. References ---------- [1] Goovaerts P., Kriging and Semivariogram Deconvolution in the Presence of Irregular Geographical Units, Mathematical Geology 40(1), 101-128, 2008 Examples -------- >>> import os >>> import geopandas as gpd >>> from pyinterpolate import ( ... AggregatedVariogram, ... Blocks, ... PointSupport ... ) >>> >>> >>> FILENAME = 'cancer_data.gpkg' >>> LAYER_NAME = 'areas' >>> DS = gpd.read_file(FILENAME, layer=LAYER_NAME) >>> AREA_VALUES = 'rate' >>> AREA_INDEX = 'FIPS' >>> AREA_GEOMETRY = 'geometry' >>> PS_LAYER_NAME = 'points' >>> PS_VALUES = 'POP10' >>> PS_GEOMETRY = 'geometry' >>> PS = gpd.read_file(FILENAME, layer=PS_LAYER_NAME) >>> >>> CANCER_DATA = { ... 'ds': DS, ... 'index_column_name': AREA_INDEX, ... 'value_column_name': AREA_VALUES, ... 'geometry_column_name': AREA_GEOMETRY ... } >>> POINT_SUPPORT_DATA = { ... 'ps': PS, ... 'value_column_name': PS_VALUES, ... 'geometry_column_name': PS_GEOMETRY ... } >>> BLOCKS = Blocks(**CANCER_DATA) >>> indexes = BLOCKS.block_indexes >>> >>> PS = PointSupport( ... points=POINT_SUPPORT_DATA['ps'], ... ps_blocks=BLOCKS, ... points_value_column=POINT_SUPPORT_DATA['value_column_name'], ... points_geometry_column=POINT_SUPPORT_DATA['geometry_column_name'] ... ) >>> STEP_SIZE = 20000 >>> MAX_RANGE = 300000 >>> ag = AggregatedVariogram( ... blocks=BLOCKS, ... point_support=PS, ... step_size=STEP_SIZE, ... max_range=MAX_RANGE ... ) >>> reg_variogram = ag.regularize() >>> print(reg_variogram[0]) [20000. 53.13549729] """ def __init__(self, blocks: Blocks, point_support: PointSupport, step_size: float, max_range: float, nugget: float = 0, direction: float = None, tolerance: float = None, variogram_weighting_method: str = 'closest', models_group: Union[str, list] = 'safe', verbose: bool = False, log_process: bool = False): self.regularized_variogram = None self.blocks = blocks self.step_size = step_size self.max_range = max_range self.nugget = nugget self.agg_lags = np.arange(self.step_size, self.max_range, self.step_size) self.tolerance = tolerance self.direction = direction self.point_support = point_support self.point_support_distances = None self.weighting_method = variogram_weighting_method self.models_group = models_group # Process info self.verbose = verbose self.log_process = log_process # Variogram models # from blocks self.experimental_variogram = None self.theoretical_model = None # from point support within a single block self.inblock_semivariance = None # from inblock semivariances; lags -> from blocks self.avg_inblock_semivariance = None # from point supports between blocks self.block_to_block_semivariance = None # average from point supports between block pairs self.avg_block_to_block_semivariance = None # Model parameters self.distances_between_blocks = None
[docs] def calculate_avg_inblock_semivariance(self) -> np.ndarray: r""" Method calculates the average semivariance within blocks :math:`\gamma_h(v, v)`. The average inblock semivariance is calculated as: .. math:: \gamma_h(v, v) = \frac{1}{(2*N(h))} \sum_{a=1}^{N(h)} [\gamma(v_{a}, v_{a}) + \gamma(v_{a+h}, v_{a+h})] where: - :math:`\gamma(v_{a}, v_{a})` is a semivariance within a block :math:`a`, - :math:`\gamma(v_{a+h}, v_{a+h})` is samivariance within a block at a distance :math:`h` from the block :math:`a`. Returns ------- avg_inblock_semivariance : numpy array ``[lag, semivariance, number of block pairs]`` """ # Calculate inblock semivariance self.inblock_semivariance = calculate_inblock_semivariance( self.point_support, self.theoretical_model ) # Calculate distances between blocks self.point_support_distances = PointSupportDistance(verbose=False) if self.distances_between_blocks is None: self.distances_between_blocks = self.point_support_distances.calculate_weighted_block_to_block_distances( point_support=self.point_support, return_distances=True ) # Calc average semivariance avg_inblock_semivariance = calculate_average_semivariance( self.distances_between_blocks, self.inblock_semivariance, step_size=self.step_size, max_range=self.max_range ) return avg_inblock_semivariance
[docs] def calculate_avg_semivariance_between_blocks(self) -> np.ndarray: r""" Function calculates semivariance between areas based on their division into smaller blocks. It is :math:`\gamma(v, v_h)` - semivariogram value between any two blocks separated by the distance :math:`h`. Returns ------- avg_block_to_block_semivariance : numpy array The average semivariance between the neighboring blocks point-supports: ``[lag, semivariance, number of block pairs within a range]``. Notes ----- Block-to-block semivariance is calculated as: .. math:: \gamma(v_{a}, v_{a+h})=\frac{1} {P_{a}P_{a+h}}\sum_{s=1}^{P_{a}}\sum_{s'=1}^{P_{a+h}}\gamma(u_{s}, u_{s'}) where: - :math:`\gamma(v_{a}, v_{a+h})` - block-to-block semivariance of the block :math:`a` and paired block :math:`a+h`. - :math:`P_{a}` and :math:`P_{a+h}` - number of support points within the block :math:`a` and block :math:`a+h`. - :math:`\gamma(u_{s}, u_{s'})` - semivariance of point supports between blocks. Then average block-to-block semivariance is calculated as: .. math:: \gamma_{h}(v, v_{h}) = \frac{1}{N(h)}\sum_{a=1}^{N(h)}\gamma(v_{a}, v_{a+h}) where: - :math:`\gamma_{h}(v, v_{h})` - averaged block-to-block semivariances for a lag :math:`h`, - :math:`\gamma(v_{a}, v_{a+h})` - semivariance of block :math:`a` and paired block at a distance :math:`h`. """ # Check if distances between blocks are calculated if self.distances_between_blocks is None: if self.point_support_distances.weighted_block_to_block_distances is None: self.point_support_distances.calculate_weighted_block_to_block_distances( point_support=self.point_support, return_distances=False ) self.distances_between_blocks = self.point_support_distances.weighted_distances else: self.distances_between_blocks = self.point_support_distances.weighted_distances # {(block id a, block id b): # [distance, semivariance, number of point pairs between blocks]} self.block_to_block_semivariance = calculate_block_to_block_semivariance( self.point_support, self.distances_between_blocks, self.theoretical_model ) semivars_arr = np.array( list(self.block_to_block_semivariance.values()), dtype=float ) avg_block_to_block_semivariance = average_block_to_block_semivariances( semivariances_array=semivars_arr, lags=self.agg_lags ) return avg_block_to_block_semivariance
[docs] def regularize(self, average_inblock_semivariances: np.ndarray = None, semivariance_between_point_supports: np.ndarray = None, experimental_block_semivariances: np.ndarray = None, theoretical_block_model: TheoreticalVariogram = None ) -> np.ndarray: r""" Method regularizes point support model. Procedure is described in [1]. Parameters ---------- average_inblock_semivariances : np.ndarray, default = None The mean semivariance between blocks. See Notes to learn more. semivariance_between_point_supports : np.ndarray, default = None Semivariance between all blocks calculated from the theoretical model. experimental_block_semivariances : np.ndarray, default = None The experimental semivariance between area coordinates. theoretical_block_model : TheoreticalVariogram, default = None A modeled variogram. Returns ------- regularized_model : numpy array ``[lag, semivariance]`` Notes ----- Function has the form: .. math:: \gamma_v(h) = \gamma(v, v_h) - \gamma_h(v, v) where: - :math:`\gamma_v(h)` - the regularized variogram, - :math:`\gamma(v, v_h)` - a variogram value between any two blocks separated by the distance :math:`h` (calculated from their point support), - :math:`\gamma_h(v, v)` - average inblock semivariance between blocks. Average inblock semivariance between blocks: .. math:: \gamma_h(v, v) = \frac{1}{(2*N(h))} \sum_{a=1}^{N(h)} [\gamma(v_{a}, v_{a}) + \gamma(v_{a+h}, v_{a+h})] where :math:`\gamma(v_{a}, v_{a})` and :math:`\gamma(v_{a+h}, v_{a+h})` are inblock semivariances of block :math:`a` and block :math:`a+h` separated by the distance :math:`h`. References ---------- [1] Goovaerts P., Kriging and Semivariogram Deconvolution in the Presence of Irregular Geographical Units, Mathematical Geology 40(1), 101-128, 2008 """ # Set all variograms and models if experimental_block_semivariances is None: self.experimental_variogram = self._get_experimental_variogram() else: self.experimental_variogram = experimental_block_semivariances if theoretical_block_model is None: self.theoretical_model = self._fit_theoretical_model() else: self.theoretical_model = theoretical_block_model # gamma_h(v, v) if average_inblock_semivariances is None: self.avg_inblock_semivariance = self.calculate_avg_inblock_semivariance() else: self.avg_inblock_semivariance = average_inblock_semivariances # gamma(v, v_h) if semivariance_between_point_supports is None: self.avg_block_to_block_semivariance = self.calculate_avg_semivariance_between_blocks() else: self.avg_block_to_block_semivariance = semivariance_between_point_supports # regularize variogram self.regularized_variogram = self.regularize_variogram() return self.regularized_variogram
[docs] def regularize_variogram(self) -> np.ndarray: r""" Function regularizes semivariograms. Returns ------- reg_variogram : numpy array ``[lag, semivariance]`` Raises ------ ValueError Semivariance at a given lag is below zero. Notes ----- Regularized semivariogram is a difference between the average block to block semivariance :math:`\gamma(v, v_{h})` and the average inblock semivariances :math:`\gamma_{h}(v, v)` at a given lag :math:`h`. .. math:: \gamma_{v}(h)=\gamma(v, v_{h}) - \gamma_{h}(v, v) """ reg_variogram = deepcopy(self.avg_block_to_block_semivariance[:, :-1]) # Get lag and semivariance reg_variogram[:, 1] = ( self.avg_block_to_block_semivariance[:, 1] - self.avg_inblock_semivariance[:, 1] ) if np.any(reg_variogram[:, 1] < 0): for row in reg_variogram: if row[1] < 0: msg = (f'Calculated semivariance of regualrized variogram ' f'for the lag {row[0]} is below zero. ' f'Semivariance value: {row[1]}') raise ValueError(msg) return reg_variogram
[docs] def show_semivariograms(self): """ Method plots: - experimental variogram, - theoretical model, - average inblock semivariance, - average block-to-block semivariance, - regularized variogram. Raises ------ AttributeError The semivariogram regularization process has not been performed. """ if self.regularized_variogram is None: raise AttributeError( 'Variograms may be plot after regularization process. ' 'Use .regularize() method, then plot the results.' ) else: plt.figure(figsize=(12, 6)) # Plot experimental plt.scatter(self.agg_lags, self.experimental_variogram.experimental_semivariances, marker='8', c='black') # Plot theoretical plt.plot( self.agg_lags, self.theoretical_model.fitted_model[:, 1], ':', color='black' ) # Plot average inblock plt.plot( self.agg_lags, self.avg_inblock_semivariance[:, 1], '--', color='#e66101' ) # Plot average block to block plt.plot( self.agg_lags, self.avg_block_to_block_semivariance[:, 1], '--', color='#fdb863' ) # Plot regularized plt.plot( self.agg_lags, self.regularized_variogram[:, 1], color='#5e3c99' ) legend = ['Experimental', 'Theoretical', 'Avg. Inblock', 'Avg. Block-to-block', 'Regularized'] plt.legend(legend) plt.xlabel('Distance') plt.ylabel('Semivariance') plt.show()
def _fit_theoretical_model(self) -> TheoreticalVariogram: """ Method automatically fits experimental curve into the WEIGHTED theoretical model if it wasn't provided as a parameter in regularize() method. Returns ------- theoretical_model : TheoreticalVariogram Weighted theoretical semivariogram. """ theoretical_model = TheoreticalVariogram() theoretical_model.autofit( experimental_variogram=self.experimental_variogram, nugget=self.nugget, models_group=self.models_group, deviation_weighting=self.weighting_method ) return theoretical_model def _get_experimental_variogram(self) -> ExperimentalVariogram: """ Method gets experimental variogram from aggregated data. Returns ------- experimental_variogram : ExperimentalVariogram """ ds = self.blocks.representative_points_array() experimental_variogram = ExperimentalVariogram( ds=ds, step_size=self.step_size, max_range=self.max_range, direction=self.direction, tolerance=self.tolerance ) return experimental_variogram
[docs] def regularize(blocks: Blocks, point_support: PointSupport, step_size: float, max_range: float, nugget: float = 0, direction: float = None, tolerance: float = None, average_inblock_semivariances: np.ndarray = None, semivariance_between_point_supports: np.ndarray = None, experimental_block_semivariances: np.ndarray = None, theoretical_block_model: TheoreticalVariogram = None, variogram_weighting_method: str = 'closest', verbose: bool = False, log_process: bool = False) -> np.ndarray: """ Function is an alias for ``AggregatedVariogram`` class and performs semivariogram regularization. Function returns regularized variogram. Parameters ---------- blocks : Blocks Aggregated data | choropleth map. point_support : PointSupport Point support of ``blocks``. step_size : float Step size between lags. max_range : float Maximal distance of analysis. nugget : float, default = 0 The nugget of blocks data. direction : float (in range [0, 360]), optional, default=0 Direction of ``blocks`` semivariogram, values from 0 to 360 degrees: - 0 or 180: is E-W, - 90 or 270 is N-S, - 45 or 225 is NE-SW, - 135 or 315 is NW-SE. tolerance : float (in range [0, 1]), optional If ``tolerance`` is 0 then points must be placed at a single line with the beginning in the origin of the coordinate system and the direction given by y axis and direction parameter. If ``tolerance`` is ``> 0`` then the bin is selected as an elliptical area with major axis pointed in the same direction as the line for ``0`` tolerance. * The major axis size == ``step_size``. * The minor axis size is ``tolerance * step_size`` * The baseline point is at a center of the ellipse. * The ``tolerance == 1`` creates an omnidirectional semivariogram. average_inblock_semivariances : np.ndarray, default = None The mean semivariance between the blocks. See Notes to learn more. semivariance_between_point_supports : np.ndarray, default = None Semivariance between all blocks calculated from the theoretical model. experimental_block_semivariances : np.ndarray, default = None The experimental semivariance between area coordinates. theoretical_block_model : TheoreticalVariogram, default = None A modeled variogram. variogram_weighting_method : str, default = "closest" Method used to weight error at a given lags. Available methods: - **equal**: no weighting, - **closest**: lags at a close range have bigger custom_weights, - **distant**: lags that are further away have bigger custom_weights, - **dense**: error is weighted by the number of point pairs within a lag - more pairs, smaller weight. verbose : bool, default = False Print steps performed by the algorithm. log_process : bool, default = False Log process info (Level ``DEBUG``). Returns ------- regularized : numpy array ``[lag, semivariance]`` See Also -------- AggregatedVariogram : core class for block semivariogram regularization References ---------- [1] Goovaerts P., Kriging and Semivariogram Deconvolution in the Presence of Irregular Geographical Units, Mathematical Geology 40(1), 101-128, 2008 Examples -------- >>> import os >>> import geopandas as gpd >>> from pyinterpolate import ( ... regularize, ... Blocks, ... PointSupport, ... ) >>> >>> >>> FILENAME = 'cancer_data.gpkg' >>> LAYER_NAME = 'areas' >>> DS = gpd.read_file(FILENAME, layer=LAYER_NAME) >>> AREA_VALUES = 'rate' >>> AREA_INDEX = 'FIPS' >>> AREA_GEOMETRY = 'geometry' >>> PS_LAYER_NAME = 'points' >>> PS_VALUES = 'POP10' >>> PS_GEOMETRY = 'geometry' >>> PS = gpd.read_file(FILENAME, layer=PS_LAYER_NAME) >>> >>> CANCER_DATA = { ... 'ds': DS, ... 'index_column_name': AREA_INDEX, ... 'value_column_name': AREA_VALUES, ... 'geometry_column_name': AREA_GEOMETRY ... } >>> POINT_SUPPORT_DATA = { ... 'ps': PS, ... 'value_column_name': PS_VALUES, ... 'geometry_column_name': PS_GEOMETRY ... } >>> BLOCKS = Blocks(**CANCER_DATA) >>> indexes = BLOCKS.block_indexes >>> >>> PS = PointSupport( ... points=POINT_SUPPORT_DATA['ps'], ... ps_blocks=BLOCKS, ... points_value_column=POINT_SUPPORT_DATA['value_column_name'], ... points_geometry_column=POINT_SUPPORT_DATA['geometry_column_name'] ... ) >>> STEP_SIZE = 20000 >>> MAX_RANGE = 300000 >>> reg_variogram = regularize( ... blocks=BLOCKS, ... point_support=PS, ... step_size=STEP_SIZE, ... max_range=MAX_RANGE ... ) >>> print(reg_variogram[0]) [20000. 53.13549729] """ agg_var = AggregatedVariogram( blocks=blocks, point_support=point_support, step_size=step_size, max_range=max_range, direction=direction, tolerance=tolerance, nugget=nugget, variogram_weighting_method=variogram_weighting_method, verbose=verbose, log_process=log_process ) regularized = agg_var.regularize(average_inblock_semivariances, semivariance_between_point_supports, experimental_block_semivariances, theoretical_block_model) return regularized